Merge pull request #25387 from veblush/mutex-cpp

Made grpc::internal::Mutex compatible to absl::Mutex
pull/25441/head
Esun Kim 4 years ago committed by GitHub
commit 8b2bd69ef3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 80
      include/grpcpp/impl/codegen/sync.h
  2. 7
      src/cpp/server/server_cc.cc
  3. 2
      test/cpp/end2end/client_lb_end2end_test.cc
  4. 9
      test/cpp/end2end/grpclb_end2end_test.cc
  5. 2
      test/cpp/end2end/mock_test.cc
  6. 2
      test/cpp/end2end/service_config_end2end_test.cc

@ -32,6 +32,10 @@
#include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/core_codegen_interface.h>
#ifdef GRPCPP_ABSEIL_SYNC
#include "absl/synchronization/mutex.h"
#endif
// The core library is not accessible in C++ codegen headers, and vice versa. // The core library is not accessible in C++ codegen headers, and vice versa.
// Thus, we need to have duplicate headers with similar functionality. // Thus, we need to have duplicate headers with similar functionality.
// Make sure any change to this file is also reflected in // Make sure any change to this file is also reflected in
@ -44,6 +48,15 @@
namespace grpc { namespace grpc {
namespace internal { namespace internal {
#ifdef GRPCPP_ABSEIL_SYNC
using Mutex = absl::Mutex;
using MutexLock = absl::MutexLock;
using ReleasableMutexLock = absl::ReleasableMutexLock;
using CondVar = absl::CondVar;
#else
class Mutex { class Mutex {
public: public:
Mutex() { g_core_codegen_interface->gpr_mu_init(&mu_); } Mutex() { g_core_codegen_interface->gpr_mu_init(&mu_); }
@ -52,8 +65,8 @@ class Mutex {
Mutex(const Mutex&) = delete; Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete; Mutex& operator=(const Mutex&) = delete;
gpr_mu* get() { return &mu_; } void Lock() { g_core_codegen_interface->gpr_mu_lock(&mu_); }
const gpr_mu* get() const { return &mu_; } void Unlock() { g_core_codegen_interface->gpr_mu_unlock(&mu_); }
private: private:
union { union {
@ -63,55 +76,40 @@ class Mutex {
pthread_mutex_t do_not_use_pth_; pthread_mutex_t do_not_use_pth_;
#endif #endif
}; };
friend class CondVar;
}; };
// MutexLock is a std::
class MutexLock { class MutexLock {
public: public:
explicit MutexLock(Mutex* mu) : mu_(mu->get()) { explicit MutexLock(Mutex* mu) : mu_(mu) { mu_->Lock(); }
g_core_codegen_interface->gpr_mu_lock(mu_); ~MutexLock() { mu_->Unlock(); }
}
explicit MutexLock(gpr_mu* mu) : mu_(mu) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
~MutexLock() { g_core_codegen_interface->gpr_mu_unlock(mu_); }
MutexLock(const MutexLock&) = delete; MutexLock(const MutexLock&) = delete;
MutexLock& operator=(const MutexLock&) = delete; MutexLock& operator=(const MutexLock&) = delete;
private: private:
gpr_mu* const mu_; Mutex* const mu_;
}; };
class ReleasableMutexLock { class ReleasableMutexLock {
public: public:
explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { explicit ReleasableMutexLock(Mutex* mu) : mu_(mu) { mu_->Lock(); }
g_core_codegen_interface->gpr_mu_lock(mu_);
}
explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
~ReleasableMutexLock() { ~ReleasableMutexLock() {
if (!released_) g_core_codegen_interface->gpr_mu_unlock(mu_); if (!released_) mu_->Unlock();
} }
ReleasableMutexLock(const ReleasableMutexLock&) = delete; ReleasableMutexLock(const ReleasableMutexLock&) = delete;
ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete; ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete;
void Lock() { void Release() {
GPR_DEBUG_ASSERT(released_);
g_core_codegen_interface->gpr_mu_lock(mu_);
released_ = false;
}
void Unlock() {
GPR_DEBUG_ASSERT(!released_); GPR_DEBUG_ASSERT(!released_);
released_ = true; released_ = true;
g_core_codegen_interface->gpr_mu_unlock(mu_); mu_->Unlock();
} }
private: private:
gpr_mu* const mu_; Mutex* const mu_;
bool released_ = false; bool released_ = false;
}; };
@ -124,27 +122,27 @@ class CondVar {
CondVar& operator=(const CondVar&) = delete; CondVar& operator=(const CondVar&) = delete;
void Signal() { g_core_codegen_interface->gpr_cv_signal(&cv_); } void Signal() { g_core_codegen_interface->gpr_cv_signal(&cv_); }
void Broadcast() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); } void SignalAll() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); }
int Wait(Mutex* mu) { void Wait(Mutex* mu) {
return Wait(mu, g_core_codegen_interface->gpr_cv_wait(
g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); &cv_, &mu->mu_,
} g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME));
int Wait(Mutex* mu, const gpr_timespec& deadline) {
return g_core_codegen_interface->gpr_cv_wait(&cv_, mu->get(), deadline);
}
template <typename Predicate>
void WaitUntil(Mutex* mu, Predicate pred) {
while (!pred()) {
Wait(mu, g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME));
}
} }
private: private:
gpr_cv cv_; gpr_cv cv_;
}; };
#endif // GRPCPP_ABSEIL_SYNC
template <typename Predicate>
static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) {
while (!pred()) {
cv->Wait(mu);
}
}
} // namespace internal } // namespace internal
} // namespace grpc } // namespace grpc

@ -908,7 +908,7 @@ Server::~Server() {
{ {
grpc::internal::ReleasableMutexLock lock(&mu_); grpc::internal::ReleasableMutexLock lock(&mu_);
if (started_ && !shutdown_) { if (started_ && !shutdown_) {
lock.Unlock(); lock.Release();
Shutdown(); Shutdown();
} else if (!started_) { } else if (!started_) {
// Shutdown the completion queues // Shutdown the completion queues
@ -1084,7 +1084,8 @@ void Server::UnrefAndWaitLocked() {
shutdown_done_ = true; shutdown_done_ = true;
return; // no need to wait on CV since done condition already set return; // no need to wait on CV since done condition already set
} }
shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; }); grpc::internal::WaitUntil(&shutdown_done_cv_, &mu_,
[this] { return shutdown_done_; });
} }
void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
@ -1246,7 +1247,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
} }
shutdown_notified_ = true; shutdown_notified_ = true;
shutdown_cv_.Broadcast(); shutdown_cv_.SignalAll();
#ifndef NDEBUG #ifndef NDEBUG
// Unregister this server with the CQs passed into it by the user so that // Unregister this server with the CQs passed into it by the user so that

@ -374,7 +374,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
grpc::internal::CondVar cond; grpc::internal::CondVar cond;
thread_ = absl::make_unique<std::thread>( thread_ = absl::make_unique<std::thread>(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond)); std::bind(&ServerData::Serve, this, server_host, &mu, &cond));
cond.WaitUntil(&mu, [this] { return server_ready_; }); grpc::internal::WaitUntil(&cond, &mu, [this] { return server_ready_; });
server_ready_ = false; server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete"); gpr_log(GPR_INFO, "server startup complete");
} }

@ -273,7 +273,8 @@ class BalancerServiceImpl : public BalancerService {
} }
{ {
grpc::internal::MutexLock lock(&mu_); grpc::internal::MutexLock lock(&mu_);
serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; }); grpc::internal::WaitUntil(&serverlist_cond_, &mu_,
[this] { return serverlist_done_; });
} }
if (client_load_reporting_interval_seconds_ > 0) { if (client_load_reporting_interval_seconds_ > 0) {
@ -333,8 +334,8 @@ class BalancerServiceImpl : public BalancerService {
grpc::internal::CondVar cv; grpc::internal::CondVar cv;
if (load_report_queue_.empty()) { if (load_report_queue_.empty()) {
load_report_cond_ = &cv; load_report_cond_ = &cv;
load_report_cond_->WaitUntil( grpc::internal::WaitUntil(load_report_cond_, &mu_,
&mu_, [this] { return !load_report_queue_.empty(); }); [this] { return !load_report_queue_.empty(); });
load_report_cond_ = nullptr; load_report_cond_ = nullptr;
} }
ClientStats load_report = std::move(load_report_queue_.front()); ClientStats load_report = std::move(load_report_queue_.front());
@ -346,7 +347,7 @@ class BalancerServiceImpl : public BalancerService {
grpc::internal::MutexLock lock(&mu_); grpc::internal::MutexLock lock(&mu_);
if (!serverlist_done_) { if (!serverlist_done_) {
serverlist_done_ = true; serverlist_done_ = true;
serverlist_cond_.Broadcast(); serverlist_cond_.SignalAll();
} }
} }

@ -205,7 +205,7 @@ TEST_F(MockCallbackTest, MockedCallSucceedsWithWait) {
req.set_message("mock 1"); req.set_message("mock 1");
auto* reactor = service_.Echo(&ctx, &req, &resp); auto* reactor = service_.Echo(&ctx, &req, &resp);
cv.WaitUntil(&mu, [&] { grpc::internal::WaitUntil(&cv, &mu, [&] {
grpc::internal::MutexLock l(&mu); grpc::internal::MutexLock l(&mu);
return status_set; return status_set;
}); });

@ -319,7 +319,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
grpc::internal::CondVar cond; grpc::internal::CondVar cond;
thread_ = absl::make_unique<std::thread>( thread_ = absl::make_unique<std::thread>(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond)); std::bind(&ServerData::Serve, this, server_host, &mu, &cond));
cond.WaitUntil(&mu, [this] { return server_ready_; }); grpc::internal::WaitUntil(&cond, &mu, [this] { return server_ready_; });
server_ready_ = false; server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete"); gpr_log(GPR_INFO, "server startup complete");
} }

Loading…
Cancel
Save