Make grpcpp/mutex compatible absl/mutex

pull/25387/head
Esun Kim 4 years ago
parent e8400c0059
commit 6468d8a69d
  1. 80
      include/grpcpp/impl/codegen/sync.h
  2. 7
      src/cpp/server/server_cc.cc
  3. 2
      test/cpp/end2end/client_lb_end2end_test.cc
  4. 9
      test/cpp/end2end/grpclb_end2end_test.cc
  5. 2
      test/cpp/end2end/mock_test.cc
  6. 2
      test/cpp/end2end/service_config_end2end_test.cc

@ -32,6 +32,10 @@
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#ifdef GRPCPP_ABSEIL_SYNC
#include "absl/synchronization/mutex.h"
#endif
// The core library is not accessible in C++ codegen headers, and vice versa.
// Thus, we need to have duplicate headers with similar functionality.
// Make sure any change to this file is also reflected in
@ -44,6 +48,15 @@
namespace grpc {
namespace internal {
#ifdef GRPCPP_ABSEIL_SYNC
using Mutex = absl::Mutex;
using MutexLock = absl::MutexLock;
using ReleasableMutexLock = absl::ReleasableMutexLock;
using CondVar = absl::CondVar;
#else
class Mutex {
public:
Mutex() { g_core_codegen_interface->gpr_mu_init(&mu_); }
@ -52,8 +65,8 @@ class Mutex {
Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
gpr_mu* get() { return &mu_; }
const gpr_mu* get() const { return &mu_; }
void Lock() { g_core_codegen_interface->gpr_mu_lock(&mu_); }
void Unlock() { g_core_codegen_interface->gpr_mu_unlock(&mu_); }
private:
union {
@ -63,55 +76,40 @@ class Mutex {
pthread_mutex_t do_not_use_pth_;
#endif
};
friend class CondVar;
};
// MutexLock is a std::
class MutexLock {
public:
explicit MutexLock(Mutex* mu) : mu_(mu->get()) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
explicit MutexLock(gpr_mu* mu) : mu_(mu) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
~MutexLock() { g_core_codegen_interface->gpr_mu_unlock(mu_); }
explicit MutexLock(Mutex* mu) : mu_(mu) { mu_->Lock(); }
~MutexLock() { mu_->Unlock(); }
MutexLock(const MutexLock&) = delete;
MutexLock& operator=(const MutexLock&) = delete;
private:
gpr_mu* const mu_;
Mutex* const mu_;
};
class ReleasableMutexLock {
public:
explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
explicit ReleasableMutexLock(Mutex* mu) : mu_(mu) { mu_->Lock(); }
~ReleasableMutexLock() {
if (!released_) g_core_codegen_interface->gpr_mu_unlock(mu_);
if (!released_) mu_->Unlock();
}
ReleasableMutexLock(const ReleasableMutexLock&) = delete;
ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete;
void Lock() {
GPR_DEBUG_ASSERT(released_);
g_core_codegen_interface->gpr_mu_lock(mu_);
released_ = false;
}
void Unlock() {
void Release() {
GPR_DEBUG_ASSERT(!released_);
released_ = true;
g_core_codegen_interface->gpr_mu_unlock(mu_);
mu_->Unlock();
}
private:
gpr_mu* const mu_;
Mutex* const mu_;
bool released_ = false;
};
@ -124,27 +122,27 @@ class CondVar {
CondVar& operator=(const CondVar&) = delete;
void Signal() { g_core_codegen_interface->gpr_cv_signal(&cv_); }
void Broadcast() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); }
void SignalAll() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); }
int Wait(Mutex* mu) {
return Wait(mu,
g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME));
}
int Wait(Mutex* mu, const gpr_timespec& deadline) {
return g_core_codegen_interface->gpr_cv_wait(&cv_, mu->get(), deadline);
}
template <typename Predicate>
void WaitUntil(Mutex* mu, Predicate pred) {
while (!pred()) {
Wait(mu, g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME));
}
void Wait(Mutex* mu) {
g_core_codegen_interface->gpr_cv_wait(
&cv_, &mu->mu_,
g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME));
}
private:
gpr_cv cv_;
};
#endif // GRPCPP_ABSEIL_SYNC
template <typename Predicate>
static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) {
while (!pred()) {
cv->Wait(mu);
}
}
} // namespace internal
} // namespace grpc

@ -965,7 +965,7 @@ Server::~Server() {
{
grpc::internal::ReleasableMutexLock lock(&mu_);
if (started_ && !shutdown_) {
lock.Unlock();
lock.Release();
Shutdown();
} else if (!started_) {
// Shutdown the completion queues
@ -1141,7 +1141,8 @@ void Server::UnrefAndWaitLocked() {
shutdown_done_ = true;
return; // no need to wait on CV since done condition already set
}
shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; });
grpc::internal::WaitUntil(&shutdown_done_cv_, &mu_,
[this] { return shutdown_done_; });
}
void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
@ -1288,7 +1289,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
}
shutdown_notified_ = true;
shutdown_cv_.Broadcast();
shutdown_cv_.SignalAll();
#ifndef NDEBUG
// Unregister this server with the CQs passed into it by the user so that

@ -374,7 +374,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
grpc::internal::CondVar cond;
thread_ = absl::make_unique<std::thread>(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond));
cond.WaitUntil(&mu, [this] { return server_ready_; });
grpc::internal::WaitUntil(&cond, &mu, [this] { return server_ready_; });
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}

@ -273,7 +273,8 @@ class BalancerServiceImpl : public BalancerService {
}
{
grpc::internal::MutexLock lock(&mu_);
serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; });
grpc::internal::WaitUntil(&serverlist_cond_, &mu_,
[this] { return serverlist_done_; });
}
if (client_load_reporting_interval_seconds_ > 0) {
@ -333,8 +334,8 @@ class BalancerServiceImpl : public BalancerService {
grpc::internal::CondVar cv;
if (load_report_queue_.empty()) {
load_report_cond_ = &cv;
load_report_cond_->WaitUntil(
&mu_, [this] { return !load_report_queue_.empty(); });
grpc::internal::WaitUntil(load_report_cond_, &mu_,
[this] { return !load_report_queue_.empty(); });
load_report_cond_ = nullptr;
}
ClientStats load_report = std::move(load_report_queue_.front());
@ -346,7 +347,7 @@ class BalancerServiceImpl : public BalancerService {
grpc::internal::MutexLock lock(&mu_);
if (!serverlist_done_) {
serverlist_done_ = true;
serverlist_cond_.Broadcast();
serverlist_cond_.SignalAll();
}
}

@ -205,7 +205,7 @@ TEST_F(MockCallbackTest, MockedCallSucceedsWithWait) {
req.set_message("mock 1");
auto* reactor = service_.Echo(&ctx, &req, &resp);
cv.WaitUntil(&mu, [&] {
grpc::internal::WaitUntil(&cv, &mu, [&] {
grpc::internal::MutexLock l(&mu);
return status_set;
});

@ -319,7 +319,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
grpc::internal::CondVar cond;
thread_ = absl::make_unique<std::thread>(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond));
cond.WaitUntil(&mu, [this] { return server_ready_; });
grpc::internal::WaitUntil(&cond, &mu, [this] { return server_ready_; });
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}

Loading…
Cancel
Save