diff --git a/include/grpcpp/impl/codegen/sync.h b/include/grpcpp/impl/codegen/sync.h index 146f182e57b..3731a4eba57 100644 --- a/include/grpcpp/impl/codegen/sync.h +++ b/include/grpcpp/impl/codegen/sync.h @@ -32,6 +32,10 @@ #include +#ifdef GRPCPP_ABSEIL_SYNC +#include "absl/synchronization/mutex.h" +#endif + // The core library is not accessible in C++ codegen headers, and vice versa. // Thus, we need to have duplicate headers with similar functionality. // Make sure any change to this file is also reflected in @@ -44,6 +48,15 @@ namespace grpc { namespace internal { +#ifdef GRPCPP_ABSEIL_SYNC + +using Mutex = absl::Mutex; +using MutexLock = absl::MutexLock; +using ReleasableMutexLock = absl::ReleasableMutexLock; +using CondVar = absl::CondVar; + +#else + class Mutex { public: Mutex() { g_core_codegen_interface->gpr_mu_init(&mu_); } @@ -52,8 +65,8 @@ class Mutex { Mutex(const Mutex&) = delete; Mutex& operator=(const Mutex&) = delete; - gpr_mu* get() { return &mu_; } - const gpr_mu* get() const { return &mu_; } + void Lock() { g_core_codegen_interface->gpr_mu_lock(&mu_); } + void Unlock() { g_core_codegen_interface->gpr_mu_unlock(&mu_); } private: union { @@ -63,55 +76,40 @@ class Mutex { pthread_mutex_t do_not_use_pth_; #endif }; + + friend class CondVar; }; -// MutexLock is a std:: class MutexLock { public: - explicit MutexLock(Mutex* mu) : mu_(mu->get()) { - g_core_codegen_interface->gpr_mu_lock(mu_); - } - explicit MutexLock(gpr_mu* mu) : mu_(mu) { - g_core_codegen_interface->gpr_mu_lock(mu_); - } - ~MutexLock() { g_core_codegen_interface->gpr_mu_unlock(mu_); } + explicit MutexLock(Mutex* mu) : mu_(mu) { mu_->Lock(); } + ~MutexLock() { mu_->Unlock(); } MutexLock(const MutexLock&) = delete; MutexLock& operator=(const MutexLock&) = delete; private: - gpr_mu* const mu_; + Mutex* const mu_; }; class ReleasableMutexLock { public: - explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { - g_core_codegen_interface->gpr_mu_lock(mu_); - } - explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { - g_core_codegen_interface->gpr_mu_lock(mu_); - } + explicit ReleasableMutexLock(Mutex* mu) : mu_(mu) { mu_->Lock(); } ~ReleasableMutexLock() { - if (!released_) g_core_codegen_interface->gpr_mu_unlock(mu_); + if (!released_) mu_->Unlock(); } ReleasableMutexLock(const ReleasableMutexLock&) = delete; ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete; - void Lock() { - GPR_DEBUG_ASSERT(released_); - g_core_codegen_interface->gpr_mu_lock(mu_); - released_ = false; - } - - void Unlock() { + void Release() { GPR_DEBUG_ASSERT(!released_); released_ = true; - g_core_codegen_interface->gpr_mu_unlock(mu_); + mu_->Unlock(); } private: - gpr_mu* const mu_; + Mutex* const mu_; bool released_ = false; }; @@ -124,27 +122,27 @@ class CondVar { CondVar& operator=(const CondVar&) = delete; void Signal() { g_core_codegen_interface->gpr_cv_signal(&cv_); } - void Broadcast() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); } + void SignalAll() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); } - int Wait(Mutex* mu) { - return Wait(mu, - g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); - } - int Wait(Mutex* mu, const gpr_timespec& deadline) { - return g_core_codegen_interface->gpr_cv_wait(&cv_, mu->get(), deadline); - } - - template - void WaitUntil(Mutex* mu, Predicate pred) { - while (!pred()) { - Wait(mu, g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); - } + void Wait(Mutex* mu) { + g_core_codegen_interface->gpr_cv_wait( + &cv_, &mu->mu_, + g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); } private: gpr_cv cv_; }; +#endif // GRPCPP_ABSEIL_SYNC + +template +static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) { + while (!pred()) { + cv->Wait(mu); + } +} + } // namespace internal } // namespace grpc diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 1c5e02edb97..d24e13ff0b6 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -908,7 +908,7 @@ Server::~Server() { { grpc::internal::ReleasableMutexLock lock(&mu_); if (started_ && !shutdown_) { - lock.Unlock(); + lock.Release(); Shutdown(); } else if (!started_) { // Shutdown the completion queues @@ -1084,7 +1084,8 @@ void Server::UnrefAndWaitLocked() { shutdown_done_ = true; return; // no need to wait on CV since done condition already set } - shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; }); + grpc::internal::WaitUntil(&shutdown_done_cv_, &mu_, + [this] { return shutdown_done_; }); } void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { @@ -1246,7 +1247,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } shutdown_notified_ = true; - shutdown_cv_.Broadcast(); + shutdown_cv_.SignalAll(); #ifndef NDEBUG // Unregister this server with the CQs passed into it by the user so that diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 644d782e116..f6ca7d66aef 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -374,7 +374,7 @@ class ClientLbEnd2endTest : public ::testing::Test { grpc::internal::CondVar cond; thread_ = absl::make_unique( std::bind(&ServerData::Serve, this, server_host, &mu, &cond)); - cond.WaitUntil(&mu, [this] { return server_ready_; }); + grpc::internal::WaitUntil(&cond, &mu, [this] { return server_ready_; }); server_ready_ = false; gpr_log(GPR_INFO, "server startup complete"); } diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index a2183201839..b9de77bac46 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -273,7 +273,8 @@ class BalancerServiceImpl : public BalancerService { } { grpc::internal::MutexLock lock(&mu_); - serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; }); + grpc::internal::WaitUntil(&serverlist_cond_, &mu_, + [this] { return serverlist_done_; }); } if (client_load_reporting_interval_seconds_ > 0) { @@ -333,8 +334,8 @@ class BalancerServiceImpl : public BalancerService { grpc::internal::CondVar cv; if (load_report_queue_.empty()) { load_report_cond_ = &cv; - load_report_cond_->WaitUntil( - &mu_, [this] { return !load_report_queue_.empty(); }); + grpc::internal::WaitUntil(load_report_cond_, &mu_, + [this] { return !load_report_queue_.empty(); }); load_report_cond_ = nullptr; } ClientStats load_report = std::move(load_report_queue_.front()); @@ -346,7 +347,7 @@ class BalancerServiceImpl : public BalancerService { grpc::internal::MutexLock lock(&mu_); if (!serverlist_done_) { serverlist_done_ = true; - serverlist_cond_.Broadcast(); + serverlist_cond_.SignalAll(); } } diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index 1c684ef51e5..f7d23e83678 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -205,7 +205,7 @@ TEST_F(MockCallbackTest, MockedCallSucceedsWithWait) { req.set_message("mock 1"); auto* reactor = service_.Echo(&ctx, &req, &resp); - cv.WaitUntil(&mu, [&] { + grpc::internal::WaitUntil(&cv, &mu, [&] { grpc::internal::MutexLock l(&mu); return status_set; }); diff --git a/test/cpp/end2end/service_config_end2end_test.cc b/test/cpp/end2end/service_config_end2end_test.cc index 07f383a14a9..ceae2ed7bb8 100644 --- a/test/cpp/end2end/service_config_end2end_test.cc +++ b/test/cpp/end2end/service_config_end2end_test.cc @@ -319,7 +319,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test { grpc::internal::CondVar cond; thread_ = absl::make_unique( std::bind(&ServerData::Serve, this, server_host, &mu, &cond)); - cond.WaitUntil(&mu, [this] { return server_ready_; }); + grpc::internal::WaitUntil(&cond, &mu, [this] { return server_ready_; }); server_ready_ = false; gpr_log(GPR_INFO, "server startup complete"); }