[gprpp] absl::Notification polyfill (#31008)

* [gprpp] absl::Notification polyfill

* grpc_core::

* Automated change: Fix sanity tests

* fix

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30996/head
Craig Tiller 3 years ago committed by GitHub
parent 868b7d82a7
commit d9ac89a441
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      BUILD
  2. 36
      CMakeLists.txt
  3. 13
      build_autogenerated.yaml
  4. 2
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 1
      package.xml
  8. 4
      src/core/ext/transport/binder/wire_format/wire_reader_impl.h
  9. 67
      src/core/lib/gprpp/notification.h
  10. 2
      src/core/lib/surface/server.cc
  11. 9
      src/core/lib/surface/server.h
  12. 12
      test/core/gprpp/BUILD
  13. 88
      test/core/gprpp/notification_test.cc
  14. 4
      test/core/gprpp/work_serializer_test.cc
  15. 4
      test/core/memory_usage/callback_client.cc
  16. 14
      test/core/promise/sleep_test.cc
  17. 5
      test/core/surface/init_test.cc
  18. 12
      test/core/transport/binder/binder_transport_test.cc
  19. 11
      test/core/transport/chttp2/graceful_shutdown_test.cc
  20. 11
      test/core/transport/chttp2/streams_not_seen_test.cc
  21. 4
      test/cpp/end2end/client_lb_end2end_test.cc
  22. 5
      test/cpp/end2end/test_service_impl.cc
  23. 1
      tools/doxygen/Doxyfile.c++.internal
  24. 1
      tools/doxygen/Doxyfile.core.internal
  25. 24
      tools/run_tests/generated/tests.json

10
BUILD

@ -3355,6 +3355,7 @@ grpc_cc_library(
"json",
"latch",
"memory_quota",
"notification",
"orphanable",
"packed_table",
"poll",
@ -3616,6 +3617,15 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "notification",
hdrs = [
"src/core/lib/gprpp/notification.h",
],
external_deps = ["absl/time"],
deps = ["gpr"],
)
grpc_cc_library(
name = "channel_args",
srcs = [

36
CMakeLists.txt generated

@ -1059,6 +1059,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx murmur_hash_test)
add_dependencies(buildtests_cxx no_destruct_test)
add_dependencies(buildtests_cxx nonblocking_test)
add_dependencies(buildtests_cxx notification_test)
add_dependencies(buildtests_cxx num_external_connectivity_watchers_test)
add_dependencies(buildtests_cxx observable_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -13748,6 +13749,41 @@ target_link_libraries(nonblocking_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(notification_test
test/core/gprpp/notification_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(notification_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(notification_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
gpr
)
endif()
if(gRPC_BUILD_TESTS)

@ -762,6 +762,7 @@ libs:
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/match.h
- src/core/lib/gprpp/notification.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/overload.h
- src/core/lib/gprpp/packed_table.h
@ -1959,6 +1960,7 @@ libs:
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/match.h
- src/core/lib/gprpp/notification.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/overload.h
- src/core/lib/gprpp/packed_table.h
@ -7853,6 +7855,17 @@ targets:
- test/cpp/end2end/nonblocking_test.cc
deps:
- grpc++_test_util
- name: notification_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/gprpp/notification.h
src:
- test/core/gprpp/notification_test.cc
deps:
- gpr
uses_polling: false
- name: num_external_connectivity_watchers_test
gtest: true
build: test

2
gRPC-C++.podspec generated

@ -729,6 +729,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mpscq.h',
'src/core/lib/gprpp/no_destruct.h',
'src/core/lib/gprpp/notification.h',
'src/core/lib/gprpp/orphanable.h',
'src/core/lib/gprpp/overload.h',
'src/core/lib/gprpp/packed_table.h',
@ -1591,6 +1592,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mpscq.h',
'src/core/lib/gprpp/no_destruct.h',
'src/core/lib/gprpp/notification.h',
'src/core/lib/gprpp/orphanable.h',
'src/core/lib/gprpp/overload.h',
'src/core/lib/gprpp/packed_table.h',

2
gRPC-Core.podspec generated

@ -1152,6 +1152,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/mpscq.cc',
'src/core/lib/gprpp/mpscq.h',
'src/core/lib/gprpp/no_destruct.h',
'src/core/lib/gprpp/notification.h',
'src/core/lib/gprpp/orphanable.h',
'src/core/lib/gprpp/overload.h',
'src/core/lib/gprpp/packed_table.h',
@ -2218,6 +2219,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mpscq.h',
'src/core/lib/gprpp/no_destruct.h',
'src/core/lib/gprpp/notification.h',
'src/core/lib/gprpp/orphanable.h',
'src/core/lib/gprpp/overload.h',
'src/core/lib/gprpp/packed_table.h',

1
grpc.gemspec generated

@ -1065,6 +1065,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gprpp/mpscq.cc )
s.files += %w( src/core/lib/gprpp/mpscq.h )
s.files += %w( src/core/lib/gprpp/no_destruct.h )
s.files += %w( src/core/lib/gprpp/notification.h )
s.files += %w( src/core/lib/gprpp/orphanable.h )
s.files += %w( src/core/lib/gprpp/overload.h )
s.files += %w( src/core/lib/gprpp/packed_table.h )

1
package.xml generated

@ -1047,6 +1047,7 @@
<file baseinstalldir="/" name="src/core/lib/gprpp/mpscq.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/mpscq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/no_destruct.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/notification.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/orphanable.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/overload.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/packed_table.h" role="src" />

@ -21,7 +21,6 @@
#include <utility>
#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/notification.h"
#include <grpcpp/security/binder_security_policy.h>
@ -29,6 +28,7 @@
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
#include "src/core/lib/gprpp/notification.h"
namespace grpc_binder {
@ -105,7 +105,7 @@ class WireReaderImpl : public WireReader {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
std::shared_ptr<TransportStreamReceiver> transport_stream_receiver_;
absl::Notification connection_noti_;
grpc_core::Notification connection_noti_;
grpc_core::Mutex mu_;
bool connected_ ABSL_GUARDED_BY(mu_) = false;
bool recvd_setup_transport_ ABSL_GUARDED_BY(mu_) = false;

@ -0,0 +1,67 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_GPRPP_NOTIFICATION_H
#define GRPC_CORE_LIB_GPRPP_NOTIFICATION_H
#include <grpc/support/port_platform.h>
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {
// Polyfill for absl::Notification until we can use that type.
class Notification {
public:
void Notify() {
MutexLock lock(&mu_);
notified_ = true;
cv_.SignalAll();
}
void WaitForNotification() {
MutexLock lock(&mu_);
while (!notified_) {
cv_.Wait(&mu_);
}
}
bool WaitForNotificationWithTimeout(absl::Duration timeout) {
auto now = absl::Now();
auto deadline = now + timeout;
MutexLock lock(&mu_);
while (!notified_ && now < deadline) {
cv_.WaitWithTimeout(&mu_, deadline - now);
now = absl::Now();
}
return notified_;
}
bool HasBeenNotified() {
MutexLock lock(&mu_);
return notified_;
}
private:
Mutex mu_;
CondVar cv_;
bool notified_ = false;
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_GPRPP_NOTIFICATION_H

@ -790,7 +790,7 @@ void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) {
// connection is NOT closed until the server is done with all those calls.
// -- Once there are no more calls in progress, the channel is closed.
void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
absl::Notification* await_requests = nullptr;
Notification* await_requests = nullptr;
ChannelBroadcaster broadcaster;
{
// Wait for startup to be finished. Locks mu_global.

@ -33,7 +33,6 @@
#include "absl/base/thread_annotations.h"
#include "absl/memory/memory.h"
#include "absl/status/statusor.h"
#include "absl/synchronization/notification.h"
#include "absl/types/optional.h"
#include <grpc/grpc.h>
@ -49,6 +48,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/cpp_impl_of.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -419,14 +419,14 @@ class Server : public InternallyRefCounted<Server>,
}
// Returns a notification pointer to wait on if there are requests in-flight,
// or null.
absl::Notification* ShutdownUnrefOnShutdownCall()
Notification* ShutdownUnrefOnShutdownCall()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) GRPC_MUST_USE_RESULT {
if (shutdown_refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
// There is no request in-flight.
MaybeFinishShutdown();
return nullptr;
}
requests_complete_ = absl::make_unique<absl::Notification>();
requests_complete_ = absl::make_unique<Notification>();
return requests_complete_.get();
}
@ -478,8 +478,7 @@ class Server : public InternallyRefCounted<Server>,
std::atomic<int> shutdown_refs_{1};
bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false;
std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_);
std::unique_ptr<absl::Notification> requests_complete_
ABSL_GUARDED_BY(mu_global_);
std::unique_ptr<Notification> requests_complete_ ABSL_GUARDED_BY(mu_global_);
std::list<ChannelData*> channels_;

@ -401,3 +401,15 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "notification_test",
srcs = ["notification_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = ["//:notification"],
)

@ -0,0 +1,88 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/gprpp/notification.h"
#include <stdlib.h>
#include <memory>
#include <thread>
#include <gtest/gtest.h>
namespace grpc_core {
namespace testing {
namespace {
TEST(Notification, Works) {
Notification n;
EXPECT_FALSE(n.HasBeenNotified());
n.Notify();
EXPECT_TRUE(n.HasBeenNotified());
n.WaitForNotification();
EXPECT_TRUE(n.HasBeenNotified());
}
TEST(Notification, Waits) {
Notification n;
auto start = absl::Now();
std::thread t([&n] {
absl::SleepFor(absl::Seconds(5));
n.Notify();
});
n.WaitForNotification();
auto end = absl::Now();
EXPECT_GE(end - start, absl::Seconds(5));
t.join();
}
TEST(Notification, WaitsWithTimeout) {
Notification n;
auto start = absl::Now();
std::thread t([&n] {
absl::SleepFor(absl::Seconds(5));
n.Notify();
});
EXPECT_TRUE(n.WaitForNotificationWithTimeout(absl::Seconds(10)));
auto end = absl::Now();
EXPECT_GE(end - start, absl::Seconds(5));
EXPECT_LE(end - start, absl::Seconds(10));
t.join();
}
TEST(Notification, WaitWithTimeoutCanFinishEarly) {
Notification n;
auto start = absl::Now();
std::thread t([&n] {
absl::SleepFor(absl::Seconds(5));
n.Notify();
});
EXPECT_FALSE(n.WaitForNotificationWithTimeout(absl::Seconds(1)));
auto end = absl::Now();
EXPECT_GE(end - start, absl::Seconds(1));
EXPECT_LE(end - start, absl::Seconds(5));
n.WaitForNotification();
end = absl::Now();
EXPECT_GE(end - start, absl::Seconds(5));
t.join();
}
} // namespace
} // namespace testing
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -25,13 +25,13 @@
#include "absl/memory/memory.h"
#include "absl/synchronization/barrier.h"
#include "absl/synchronization/notification.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/executor.h"
#include "test/core/util/test_config.h"
@ -205,7 +205,7 @@ TEST(WorkSerializerTest, CallbackDestroysWorkSerializer) {
TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
for (int i = 0; i < 1000; ++i) {
auto lock = std::make_shared<grpc_core::WorkSerializer>();
absl::Notification notification;
grpc_core::Notification notification;
std::thread t1([&]() {
notification.WaitForNotification();
lock.reset();

@ -28,7 +28,6 @@
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/notification.h"
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
@ -36,6 +35,7 @@
#include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/status.h>
#include "src/core/lib/gprpp/notification.h"
#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
#include "src/proto/grpc/testing/messages.pb.h"
#include "test/core/memory_usage/memstats.h"
@ -74,7 +74,7 @@ struct CallParams {
grpc::testing::SimpleRequest request;
grpc::testing::SimpleResponse response;
grpc::testing::MemorySize snapshot_response;
absl::Notification done;
grpc_core::Notification done;
};
// Simple Unary RPC to send to confirm connection is open

@ -20,11 +20,11 @@
#include <utility>
#include <vector>
#include "absl/synchronization/notification.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
#include "src/core/lib/promise/race.h"
@ -35,7 +35,7 @@ namespace {
TEST(Sleep, Zzzz) {
ExecCtx exec_ctx;
absl::Notification done;
Notification done;
Timestamp done_time = Timestamp::Now() + Duration::Seconds(1);
// Sleep for one second then set done to true.
auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(),
@ -50,7 +50,7 @@ TEST(Sleep, Zzzz) {
TEST(Sleep, AlreadyDone) {
ExecCtx exec_ctx;
absl::Notification done;
Notification done;
Timestamp done_time = Timestamp::Now() - Duration::Seconds(1);
// Sleep for no time at all then set done to true.
auto activity = MakeActivity(Sleep(done_time), InlineWakeupScheduler(),
@ -63,7 +63,7 @@ TEST(Sleep, AlreadyDone) {
TEST(Sleep, Cancel) {
ExecCtx exec_ctx;
absl::Notification done;
Notification done;
Timestamp done_time = Timestamp::Now() + Duration::Seconds(1);
// Sleep for one second but race it to complete immediately
auto activity = MakeActivity(
@ -80,7 +80,7 @@ TEST(Sleep, Cancel) {
TEST(Sleep, MoveSemantics) {
// ASAN should help determine if there are any memory leaks here
ExecCtx exec_ctx;
absl::Notification done;
Notification done;
Timestamp done_time = Timestamp::Now() + Duration::Milliseconds(111);
Sleep donor(done_time);
Sleep sleeper = std::move(donor);
@ -98,11 +98,11 @@ TEST(Sleep, StressTest) {
// Kick off a bunch sleeps for one second.
static const int kNumActivities = 100000;
ExecCtx exec_ctx;
std::vector<std::shared_ptr<absl::Notification>> notifications;
std::vector<std::shared_ptr<Notification>> notifications;
std::vector<ActivityPtr> activities;
gpr_log(GPR_INFO, "Starting %d sleeps for 1sec", kNumActivities);
for (int i = 0; i < kNumActivities; i++) {
auto notification = std::make_shared<absl::Notification>();
auto notification = std::make_shared<Notification>();
auto activity = MakeActivity(
Sleep(Timestamp::Now() + Duration::Seconds(1)),
ExecCtxWakeupScheduler(),

@ -20,13 +20,12 @@
#include <gtest/gtest.h>
#include "absl/synchronization/notification.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/util/test_config.h"
@ -142,7 +141,7 @@ TEST(Init, repeatedly_blocking) {
TEST(Init, TimerManagerHoldsLastInit) {
grpc_init();
absl::Notification n;
grpc_core::Notification n;
grpc_event_engine::experimental::GetDefaultEventEngine()->RunAfter(
std::chrono::seconds(1), [&n] {
grpc_shutdown();

@ -27,12 +27,12 @@
#include "absl/memory/memory.h"
#include "absl/strings/match.h"
#include "absl/strings/str_join.h"
#include "absl/synchronization/notification.h"
#include <grpc/grpc.h>
#include <grpcpp/security/binder_security_policy.h>
#include "src/core/ext/transport/binder/transport/binder_stream.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/transport/binder/mock_objects.h"
#include "test/core/util/test_config.h"
@ -111,7 +111,7 @@ void MockCallback(void* arg, grpc_error_handle error);
class MockGrpcClosure {
public:
explicit MockGrpcClosure(absl::Notification* notification = nullptr)
explicit MockGrpcClosure(grpc_core::Notification* notification = nullptr)
: notification_(notification) {
GRPC_CLOSURE_INIT(&closure_, MockCallback, this, nullptr);
}
@ -119,7 +119,7 @@ class MockGrpcClosure {
grpc_closure* GetGrpcClosure() { return &closure_; }
MOCK_METHOD(void, Callback, (grpc_error_handle), ());
absl::Notification* notification_;
grpc_core::Notification* notification_;
private:
grpc_closure closure_;
@ -288,7 +288,7 @@ struct MakeRecvInitialMetadata {
grpc_core::ScopedArenaPtr arena =
grpc_core::MakeScopedArena(1024, g_memory_allocator);
grpc_metadata_batch grpc_initial_metadata{arena.get()};
absl::Notification notification;
grpc_core::Notification notification;
};
struct MakeRecvMessage {
@ -306,7 +306,7 @@ struct MakeRecvMessage {
}
MockGrpcClosure ready;
absl::Notification notification;
grpc_core::Notification notification;
absl::optional<grpc_core::SliceBuffer> grpc_message;
};
@ -332,7 +332,7 @@ struct MakeRecvTrailingMetadata {
grpc_core::ScopedArenaPtr arena =
grpc_core::MakeScopedArena(1024, g_memory_allocator);
grpc_metadata_batch grpc_trailing_metadata{arena.get()};
absl::Notification notification;
grpc_core::Notification notification;
};
const Metadata kDefaultMetadata = {

@ -34,7 +34,6 @@
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/notification.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "gtest/gtest.h"
@ -51,6 +50,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
@ -101,7 +101,7 @@ class GracefulShutdownTest : public ::testing::Test {
nullptr) == GRPC_ERROR_NONE);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
// Start polling on the client
absl::Notification client_poller_thread_started_notification;
Notification client_poller_thread_started_notification;
client_poll_thread_ = absl::make_unique<std::thread>(
[this, &client_poller_thread_started_notification]() {
grpc_completion_queue* client_cq =
@ -225,7 +225,7 @@ class GracefulShutdownTest : public ::testing::Test {
}
void WriteBuffer(grpc_slice_buffer* buffer) {
absl::Notification on_write_done_notification_;
Notification on_write_done_notification_;
GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
&on_write_done_notification_, nullptr);
grpc_endpoint_write(fds_.client, buffer, &on_write_done_, nullptr,
@ -237,8 +237,7 @@ class GracefulShutdownTest : public ::testing::Test {
static void OnWriteDone(void* arg, grpc_error_handle error) {
GPR_ASSERT(GRPC_ERROR_IS_NONE(error));
absl::Notification* on_write_done_notification_ =
static_cast<absl::Notification*>(arg);
Notification* on_write_done_notification_ = static_cast<Notification*>(arg);
on_write_done_notification_->Notify();
}
@ -251,7 +250,7 @@ class GracefulShutdownTest : public ::testing::Test {
grpc_closure on_read_done_;
Mutex mu_;
CondVar read_cv_;
absl::Notification read_end_notification_;
Notification read_end_notification_;
grpc_slice_buffer read_buffer_;
std::string read_bytes_ ABSL_GUARDED_BY(mu_);
grpc_closure on_write_done_;

@ -36,7 +36,6 @@
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/notification.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "gtest/gtest.h"
@ -60,6 +59,7 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
@ -350,7 +350,7 @@ class StreamsNotSeenTest : public ::testing::Test {
}
void WriteBuffer(grpc_slice_buffer* buffer) {
absl::Notification on_write_done_notification_;
Notification on_write_done_notification_;
GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
&on_write_done_notification_, nullptr);
grpc_endpoint_write(tcp_, buffer, &on_write_done_, nullptr,
@ -362,8 +362,7 @@ class StreamsNotSeenTest : public ::testing::Test {
static void OnWriteDone(void* arg, grpc_error_handle error) {
GPR_ASSERT(GRPC_ERROR_IS_NONE(error));
absl::Notification* on_write_done_notification_ =
static_cast<absl::Notification*>(arg);
Notification* on_write_done_notification_ = static_cast<Notification*>(arg);
on_write_done_notification_->Notify();
}
@ -414,11 +413,11 @@ class StreamsNotSeenTest : public ::testing::Test {
test_tcp_server server_;
std::unique_ptr<std::thread> server_poll_thread_;
grpc_endpoint* tcp_ = nullptr;
absl::Notification connect_notification_;
Notification connect_notification_;
grpc_slice_buffer read_buffer_;
grpc_closure on_write_done_;
grpc_closure on_read_done_;
absl::Notification read_end_notification_;
Notification read_end_notification_;
std::string read_bytes_ ABSL_GUARDED_BY(mu_);
grpc_channel* channel_ = nullptr;
grpc_completion_queue* cq_ = nullptr;

@ -1266,7 +1266,7 @@ TEST_F(PickFirstTest, FailsEmptyResolverUpdate) {
gpr_log(GPR_INFO, "****** SENDING INITIAL RESOLVER RESULT *******");
// Send a resolver result with an empty address list and a callback
// that triggers a notification.
absl::Notification notification;
grpc_core::Notification notification;
grpc_core::Resolver::Result result;
result.addresses.emplace();
result.result_health_callback = [&](absl::Status status) {
@ -1677,7 +1677,7 @@ TEST_F(RoundRobinTest, FailsEmptyResolverUpdate) {
gpr_log(GPR_INFO, "****** SENDING INITIAL RESOLVER RESULT *******");
// Send a resolver result with an empty address list and a callback
// that triggers a notification.
absl::Notification notification;
grpc_core::Notification notification;
grpc_core::Resolver::Result result;
result.addresses.emplace();
result.resolution_note = "injected error";

@ -23,13 +23,12 @@
#include <gtest/gtest.h>
#include "absl/synchronization/notification.h"
#include <grpc/support/log.h>
#include <grpcpp/alarm.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/server_context.h>
#include "src/core/lib/gprpp/notification.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/cpp/util/string_ref_helper.h"
@ -637,7 +636,7 @@ CallbackTestServiceImpl::BidiStream(CallbackServerContext* context) {
bool setup_done_{false};
std::thread finish_thread_;
bool client_try_cancel_ = false;
absl::Notification cancel_notification_;
grpc_core::Notification cancel_notification_;
};
return new Reactor(context);

@ -2049,6 +2049,7 @@ src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mpscq.cc \
src/core/lib/gprpp/mpscq.h \
src/core/lib/gprpp/no_destruct.h \
src/core/lib/gprpp/notification.h \
src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/overload.h \
src/core/lib/gprpp/packed_table.h \

@ -1840,6 +1840,7 @@ src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mpscq.cc \
src/core/lib/gprpp/mpscq.h \
src/core/lib/gprpp/no_destruct.h \
src/core/lib/gprpp/notification.h \
src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/overload.h \
src/core/lib/gprpp/packed_table.h \

@ -4787,6 +4787,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "notification_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save