From 0b53341328d5d1a8047037622b622d2ddb50d5b1 Mon Sep 17 00:00:00 2001 From: Esun Kim <veblush@google.com> Date: Tue, 2 Feb 2021 10:10:25 -0800 Subject: [PATCH] Made grpc_core::Mutex compatible to absl::Mutex --- .../filters/client_channel/client_channel.cc | 5 +- .../client_channel/http_connect_handshaker.cc | 4 +- .../ext/filters/client_channel/subchannel.cc | 4 +- .../ext/filters/max_age/max_age_filter.cc | 67 +++---- src/core/lib/channel/handshaker.cc | 7 +- src/core/lib/channel/handshaker.h | 2 +- src/core/lib/gprpp/mpscq.cc | 4 +- src/core/lib/gprpp/sync.h | 169 +++++++++++++----- src/core/lib/iomgr/ev_apple.cc | 17 +- src/core/lib/iomgr/ev_epollex_linux.cc | 8 +- .../google_default_credentials.cc | 13 +- .../security/transport/security_handshaker.cc | 4 +- src/core/lib/slice/slice_intern.cc | 2 +- src/core/lib/surface/init.cc | 28 ++- src/core/lib/surface/server.cc | 2 +- src/core/lib/transport/metadata.cc | 2 +- .../alts/handshaker/alts_handshaker_client.cc | 37 ++-- .../alts/handshaker/alts_tsi_handshaker.cc | 37 ++-- .../ssl/session_cache/ssl_session_cache.cc | 2 - .../tsi/ssl/session_cache/ssl_session_cache.h | 3 +- src/cpp/server/dynamic_thread_pool.cc | 4 +- src/cpp/server/load_reporter/load_reporter.cc | 2 +- .../load_reporter_async_service_impl.cc | 8 +- src/cpp/thread_manager/thread_manager.cc | 10 +- test/core/handshake/server_ssl_common.cc | 2 +- test/core/iomgr/stranded_event_test.cc | 6 +- test/cpp/common/time_jump_test.cc | 5 +- test/cpp/end2end/xds_end2end_test.cc | 20 ++- 28 files changed, 277 insertions(+), 197 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index a1a03dfe767..1171f38aa74 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -389,7 +389,7 @@ class ChannelData { // Fields guarded by a mutex, since they need to be accessed // synchronously via get_channel_info(). // - gpr_mu info_mu_; + Mutex info_mu_; UniquePtr<char> info_lb_policy_name_; UniquePtr<char> info_service_config_json_; @@ -1885,8 +1885,6 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p", this, owning_stack_); } - // Initialize data members. - gpr_mu_init(&info_mu_); // Start backup polling. grpc_client_channel_start_backup_polling(interested_parties_); // Check client channel factory. @@ -1955,7 +1953,6 @@ ChannelData::~ChannelData() { grpc_client_channel_stop_backup_polling(interested_parties_); grpc_pollset_set_destroy(interested_parties_); GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED)); - gpr_mu_destroy(&info_mu_); } RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy( diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index f469a5a9001..8880806440d 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -151,7 +151,7 @@ void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) { // If the write failed or we're shutting down, clean up and invoke the // callback with the error. handshaker->HandshakeFailedLocked(GRPC_ERROR_REF(error)); - lock.Unlock(); + lock.Release(); handshaker->Unref(); } else { // Otherwise, read the response. @@ -256,7 +256,7 @@ done: // Set shutdown to true so that subsequent calls to // http_connect_handshaker_shutdown() do nothing. handshaker->is_shutdown_ = true; - lock.Unlock(); + lock.Release(); handshaker->Unref(); } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index dbac59afc7b..b50295d56fd 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -1023,9 +1023,9 @@ void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); c->ContinueConnectingLocked(); - lock.Unlock(); + lock.Release(); } else { - lock.Unlock(); + lock.Release(); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } GRPC_ERROR_UNREF(error); diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index b417db51caa..ee89272fcf3 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -54,16 +54,16 @@ struct channel_data { grpc_channel_stack* channel_stack; /* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer and max_age_grace_timer_pending */ - gpr_mu max_age_timer_mu; + grpc_core::Mutex max_age_timer_mu; /* True if the max_age timer callback is currently pending */ - bool max_age_timer_pending; + bool max_age_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false; /* True if the max_age_grace timer callback is currently pending */ - bool max_age_grace_timer_pending; + bool max_age_grace_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false; /* The timer for checking if the channel has reached its max age */ - grpc_timer max_age_timer; + grpc_timer max_age_timer ABSL_GUARDED_BY(max_age_timer_mu); /* The timer for checking if the max-aged channel has uesed up the grace period */ - grpc_timer max_age_grace_timer; + grpc_timer max_age_grace_timer ABSL_GUARDED_BY(max_age_timer_mu); /* The timer for checking if the channel's idle duration reaches max_connection_idle */ grpc_timer max_idle_timer; @@ -260,13 +260,15 @@ class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { static void start_max_age_timer_after_init(void* arg, grpc_error* /*error*/) { channel_data* chand = static_cast<channel_data*>(arg); - gpr_mu_lock(&chand->max_age_timer_mu); - chand->max_age_timer_pending = true; - GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); - grpc_timer_init(&chand->max_age_timer, - grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age, - &chand->close_max_age_channel); - gpr_mu_unlock(&chand->max_age_timer_mu); + { + grpc_core::MutexLock lock(&chand->max_age_timer_mu); + chand->max_age_timer_pending = true; + GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); + grpc_timer_init( + &chand->max_age_timer, + grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age, + &chand->close_max_age_channel); + } grpc_transport_op* op = grpc_make_transport_op(nullptr); op->start_connectivity_watch.reset(new grpc_core::ConnectivityWatcher(chand)); op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; @@ -278,16 +280,17 @@ static void start_max_age_timer_after_init(void* arg, grpc_error* /*error*/) { static void start_max_age_grace_timer_after_goaway_op(void* arg, grpc_error* /*error*/) { channel_data* chand = static_cast<channel_data*>(arg); - gpr_mu_lock(&chand->max_age_timer_mu); - chand->max_age_grace_timer_pending = true; - GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); - grpc_timer_init( - &chand->max_age_grace_timer, - chand->max_connection_age_grace == GRPC_MILLIS_INF_FUTURE - ? GRPC_MILLIS_INF_FUTURE - : grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age_grace, - &chand->force_close_max_age_channel); - gpr_mu_unlock(&chand->max_age_timer_mu); + { + grpc_core::MutexLock lock(&chand->max_age_timer_mu); + chand->max_age_grace_timer_pending = true; + GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); + grpc_timer_init(&chand->max_age_grace_timer, + chand->max_connection_age_grace == GRPC_MILLIS_INF_FUTURE + ? GRPC_MILLIS_INF_FUTURE + : grpc_core::ExecCtx::Get()->Now() + + chand->max_connection_age_grace, + &chand->force_close_max_age_channel); + } GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age start_max_age_grace_timer_after_goaway_op"); } @@ -350,9 +353,10 @@ static void max_idle_timer_cb(void* arg, grpc_error* error) { static void close_max_age_channel(void* arg, grpc_error* error) { channel_data* chand = static_cast<channel_data*>(arg); - gpr_mu_lock(&chand->max_age_timer_mu); - chand->max_age_timer_pending = false; - gpr_mu_unlock(&chand->max_age_timer_mu); + { + grpc_core::MutexLock lock(&chand->max_age_timer_mu); + chand->max_age_timer_pending = false; + } if (error == GRPC_ERROR_NONE) { GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age start_max_age_grace_timer_after_goaway_op"); @@ -372,9 +376,10 @@ static void close_max_age_channel(void* arg, grpc_error* error) { static void force_close_max_age_channel(void* arg, grpc_error* error) { channel_data* chand = static_cast<channel_data*>(arg); - gpr_mu_lock(&chand->max_age_timer_mu); - chand->max_age_grace_timer_pending = false; - gpr_mu_unlock(&chand->max_age_timer_mu); + { + grpc_core::MutexLock lock(&chand->max_age_timer_mu); + chand->max_age_grace_timer_pending = false; + } if (error == GRPC_ERROR_NONE) { grpc_transport_op* op = grpc_make_transport_op(nullptr); op->disconnect_with_error = @@ -426,9 +431,7 @@ static void max_age_destroy_call_elem( static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - gpr_mu_init(&chand->max_age_timer_mu); - chand->max_age_timer_pending = false; - chand->max_age_grace_timer_pending = false; + new (chand) channel_data(); chand->channel_stack = args->channel_stack; chand->max_connection_age = add_random_max_connection_age_jitter_and_convert_to_grpc_millis( @@ -513,7 +516,7 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem, /* Destructor for channel_data. */ static void max_age_destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - gpr_mu_destroy(&chand->max_age_timer_mu); + chand->~channel_data(); } const grpc_channel_filter grpc_max_age_filter = { diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc index 8d1d45e5623..000a7d2e26b 100644 --- a/src/core/lib/channel/handshaker.cc +++ b/src/core/lib/channel/handshaker.cc @@ -53,7 +53,7 @@ std::string HandshakerArgsString(HandshakerArgs* args) { } // namespace -HandshakeManager::HandshakeManager() { gpr_mu_init(&mu_); } +HandshakeManager::HandshakeManager() {} /// Add \a mgr to the server side list of all pending handshake managers, the /// list starts with \a *head. @@ -104,10 +104,7 @@ void HandshakeManager::Add(RefCountedPtr<Handshaker> handshaker) { handshakers_.push_back(std::move(handshaker)); } -HandshakeManager::~HandshakeManager() { - handshakers_.clear(); - gpr_mu_destroy(&mu_); -} +HandshakeManager::~HandshakeManager() { handshakers_.clear(); } void HandshakeManager::Shutdown(grpc_error* why) { { diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 541f38ed9dd..3dc6da85966 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -144,7 +144,7 @@ class HandshakeManager : public RefCounted<HandshakeManager> { static const size_t HANDSHAKERS_INIT_SIZE = 2; - gpr_mu mu_; + Mutex mu_; bool is_shutdown_ = false; // An array of handshakers added via grpc_handshake_manager_add(). absl::InlinedVector<RefCountedPtr<Handshaker>, HANDSHAKERS_INIT_SIZE> diff --git a/src/core/lib/gprpp/mpscq.cc b/src/core/lib/gprpp/mpscq.cc index 2bf9981ee26..373ec09f8e9 100644 --- a/src/core/lib/gprpp/mpscq.cc +++ b/src/core/lib/gprpp/mpscq.cc @@ -86,9 +86,9 @@ bool LockedMultiProducerSingleConsumerQueue::Push(Node* node) { LockedMultiProducerSingleConsumerQueue::Node* LockedMultiProducerSingleConsumerQueue::TryPop() { - if (gpr_mu_trylock(mu_.get())) { + if (mu_.TryLock()) { Node* node = queue_.Pop(); - gpr_mu_unlock(mu_.get()); + mu_.Unlock(); return node; } return nullptr; diff --git a/src/core/lib/gprpp/sync.h b/src/core/lib/gprpp/sync.h index 0a911b1cd26..f385883d166 100644 --- a/src/core/lib/gprpp/sync.h +++ b/src/core/lib/gprpp/sync.h @@ -26,6 +26,9 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "absl/synchronization/mutex.h" +#include "src/core/lib/gprpp/time_util.h" + // The core library is not accessible in C++ codegen headers, and vice versa. // Thus, we need to have duplicate headers with similar functionality. // Make sure any change to this file is also reflected in @@ -37,7 +40,23 @@ namespace grpc_core { -class Mutex { +#ifdef GPR_ABSEIL_SYNC + +using Mutex = absl::Mutex; +using MutexLock = absl::MutexLock; +using ReleasableMutexLock = absl::ReleasableMutexLock; +using CondVar = absl::CondVar; + +// Returns the underlying gpr_mu from Mutex. This should be used only when +// it has to like passing the C++ mutex to C-core API. +// TODO(veblush): Remove this after C-core no longer uses gpr_mu. +inline gpr_mu* GetUnderlyingGprMu(Mutex* mutex) { + return reinterpret_cast<gpr_mu*>(mutex); +} + +#else + +class ABSL_LOCKABLE Mutex { public: Mutex() { gpr_mu_init(&mu_); } ~Mutex() { gpr_mu_destroy(&mu_); } @@ -45,52 +64,59 @@ class Mutex { Mutex(const Mutex&) = delete; Mutex& operator=(const Mutex&) = delete; - gpr_mu* get() { return &mu_; } - const gpr_mu* get() const { return &mu_; } + void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION() { gpr_mu_lock(&mu_); } + void Unlock() ABSL_UNLOCK_FUNCTION() { gpr_mu_unlock(&mu_); } + bool TryLock() ABSL_EXCLUSIVE_TRYLOCK_FUNCTION(true) { + return gpr_mu_trylock(&mu_) != 0; + } private: gpr_mu mu_; + + friend class CondVar; + friend gpr_mu* GetUnderlyingGprMu(Mutex* mutex); }; -// MutexLock is a std:: -class MutexLock { +// Returns the underlying gpr_mu from Mutex. This should be used only when +// it has to like passing the C++ mutex to C-core API. +// TODO(veblush): Remove this after C-core no longer uses gpr_mu. +inline gpr_mu* GetUnderlyingGprMu(Mutex* mutex) { return &mutex->mu_; } + +class ABSL_SCOPED_LOCKABLE MutexLock { public: - explicit MutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); } - explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); } - ~MutexLock() { gpr_mu_unlock(mu_); } + explicit MutexLock(Mutex* mu) ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) : mu_(mu) { + mu_->Lock(); + } + ~MutexLock() ABSL_UNLOCK_FUNCTION() { mu_->Unlock(); } MutexLock(const MutexLock&) = delete; MutexLock& operator=(const MutexLock&) = delete; private: - gpr_mu* const mu_; + Mutex* const mu_; }; -class ReleasableMutexLock { +class ABSL_SCOPED_LOCKABLE ReleasableMutexLock { public: - explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); } - explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); } - ~ReleasableMutexLock() { - if (!released_) gpr_mu_unlock(mu_); + explicit ReleasableMutexLock(Mutex* mu) ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) + : mu_(mu) { + mu_->Lock(); + } + ~ReleasableMutexLock() ABSL_UNLOCK_FUNCTION() { + if (!released_) mu_->Unlock(); } ReleasableMutexLock(const ReleasableMutexLock&) = delete; ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete; - void Lock() { - GPR_DEBUG_ASSERT(released_); - gpr_mu_lock(mu_); - released_ = false; - } - - void Unlock() { + void Release() ABSL_UNLOCK_FUNCTION() { GPR_DEBUG_ASSERT(!released_); released_ = true; - gpr_mu_unlock(mu_); + mu_->Unlock(); } private: - gpr_mu* const mu_; + Mutex* const mu_; bool released_ = false; }; @@ -103,31 +129,94 @@ class CondVar { CondVar& operator=(const CondVar&) = delete; void Signal() { gpr_cv_signal(&cv_); } - void Broadcast() { gpr_cv_broadcast(&cv_); } + void SignalAll() { gpr_cv_broadcast(&cv_); } + + void Wait(Mutex* mu) { WaitWithDeadline(mu, absl::InfiniteFuture()); } + bool WaitWithTimeout(Mutex* mu, absl::Duration timeout) { + return gpr_cv_wait(&cv_, &mu->mu_, ToGprTimeSpec(timeout)) != 0; + } + bool WaitWithDeadline(Mutex* mu, absl::Time deadline) { + return gpr_cv_wait(&cv_, &mu->mu_, ToGprTimeSpec(deadline)) != 0; + } + + private: + gpr_cv cv_; +}; + +#endif // GPR_ABSEIL_SYNC + +template <typename Predicate> +static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) { + while (!pred()) { + cv->Wait(mu); + } +} + +// Returns true iff we timed-out +template <typename Predicate> +static bool WaitUntilWithTimeout(CondVar* cv, Mutex* mu, Predicate pred, + absl::Duration timeout) { + while (!pred()) { + if (cv->WaitWithTimeout(mu, timeout)) return true; + } + return false; +} + +// Returns true iff we timed-out +template <typename Predicate> +static bool WaitUntilWithDeadline(CondVar* cv, Mutex* mu, Predicate pred, + absl::Time deadline) { + while (!pred()) { + if (cv->WaitWithDeadline(mu, deadline)) return true; + } + return false; +} + +// Deprecated. Prefer MutexLock +class MutexLockForGprMu { + public: + explicit MutexLockForGprMu(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); } + ~MutexLockForGprMu() { gpr_mu_unlock(mu_); } + + MutexLockForGprMu(const MutexLock&) = delete; + MutexLockForGprMu& operator=(const MutexLock&) = delete; + + private: + gpr_mu* const mu_; +}; - int Wait(Mutex* mu) { return Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } - int Wait(Mutex* mu, const gpr_timespec& deadline) { - return gpr_cv_wait(&cv_, mu->get(), deadline); +// Deprecated. Prefer MutexLock or ReleasableMutexLock +class ABSL_SCOPED_LOCKABLE LockableAndReleasableMutexLock { + public: + explicit LockableAndReleasableMutexLock(Mutex* mu) + ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) + : mu_(mu) { + mu_->Lock(); } + ~LockableAndReleasableMutexLock() ABSL_UNLOCK_FUNCTION() { + if (!released_) mu_->Unlock(); + } + + LockableAndReleasableMutexLock(const LockableAndReleasableMutexLock&) = + delete; + LockableAndReleasableMutexLock& operator=( + const LockableAndReleasableMutexLock&) = delete; - template <typename Predicate> - void WaitUntil(Mutex* mu, Predicate pred) { - while (!pred()) { - Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } + void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION() { + GPR_DEBUG_ASSERT(released_); + mu_->Lock(); + released_ = false; } - // Returns true iff we timed-out - template <typename Predicate> - bool WaitUntil(Mutex* mu, Predicate pred, const gpr_timespec& deadline) { - while (!pred()) { - if (Wait(mu, deadline)) return true; - } - return false; + void Release() ABSL_UNLOCK_FUNCTION() { + GPR_DEBUG_ASSERT(!released_); + released_ = true; + mu_->Unlock(); } private: - gpr_cv cv_; + Mutex* const mu_; + bool released_ = false; }; } // namespace grpc_core diff --git a/src/core/lib/iomgr/ev_apple.cc b/src/core/lib/iomgr/ev_apple.cc index d1525828865..308ee0f5955 100644 --- a/src/core/lib/iomgr/ev_apple.cc +++ b/src/core/lib/iomgr/ev_apple.cc @@ -33,7 +33,10 @@ #include <list> +#include "absl/time/time.h" + #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/gprpp/time_util.h" #include "src/core/lib/iomgr/ev_apple.h" grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling"); @@ -161,7 +164,7 @@ void grpc_apple_register_write_stream(CFWriteStreamRef write_stream, /// Drive the run loop in a global singleton thread until the global run loop is /// shutdown. static void GlobalRunLoopFunc(void* arg) { - grpc_core::ReleasableMutexLock lock(&gGlobalRunLoopContext->mu); + grpc_core::LockableAndReleasableMutexLock lock(&gGlobalRunLoopContext->mu); gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent(); gGlobalRunLoopContext->init_cv.Signal(); @@ -173,11 +176,11 @@ static void GlobalRunLoopFunc(void* arg) { gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu); } gGlobalRunLoopContext->input_source_registered = false; - lock.Unlock(); + lock.Release(); CFRunLoopRun(); lock.Lock(); } - lock.Unlock(); + lock.Release(); } // pollset implementation @@ -237,9 +240,9 @@ static grpc_error* pollset_work(grpc_pollset* pollset, auto it = apple_pollset->workers.begin(); while (!actual_worker.kicked && !apple_pollset->is_shutdown) { - if (actual_worker.cv.Wait( - &apple_pollset->mu, - grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { + if (actual_worker.cv.WaitWithDeadline( + &apple_pollset->mu, grpc_core::ToAbslTime(grpc_millis_to_timespec( + deadline, GPR_CLOCK_REALTIME)))) { // timed out break; } @@ -299,7 +302,7 @@ static grpc_error* pollset_kick(grpc_pollset* pollset, static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { GRPC_POLLING_TRACE("pollset init: %p", pollset); GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset(); - *mu = apple_pollset->mu.get(); + *mu = grpc_core::GetUnderlyingGprMu(&apple_pollset->mu); } /// The caller must acquire the lock GrpcApplePollset.mu before calling this diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index acd095a43ed..f26a99293af 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -537,7 +537,7 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) { const int epfd = pollset->active_pollable->epfd; - grpc_core::MutexLock lock(&fd->pollable_mu); + grpc_core::MutexLockForGprMu lock(&fd->pollable_mu); for (size_t i = 0; i < fd->pollset_fds.size(); ++i) { if (fd->pollset_fds[i] == epfd) { return true; @@ -548,7 +548,7 @@ static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) { static void fd_add_pollset(grpc_fd* fd, grpc_pollset* pollset) { const int epfd = pollset->active_pollable->epfd; - grpc_core::MutexLock lock(&fd->pollable_mu); + grpc_core::MutexLockForGprMu lock(&fd->pollable_mu); fd->pollset_fds.push_back(epfd); } @@ -684,7 +684,7 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) { static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) { GPR_TIMER_SCOPE("kick_one_worker", 0); pollable* p = specific_worker->pollable_obj; - grpc_core::MutexLock lock(&p->mu); + grpc_core::MutexLockForGprMu lock(&p->mu); GPR_ASSERT(specific_worker != nullptr); if (specific_worker->kicked) { if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) { @@ -1296,7 +1296,7 @@ static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) { return; } - grpc_core::MutexLock lock(&pollset->mu); + grpc_core::MutexLockForGprMu lock(&pollset->mu); grpc_error* error = pollset_add_fd_locked(pollset, fd); // If we are in PO_MULTI mode, we should update the pollsets of the FD. diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.cc b/src/core/lib/security/credentials/google_default/google_default_credentials.cc index d1de87db04d..19099343c16 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.cc +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.cc @@ -61,7 +61,7 @@ using grpc_core::Json; * means the detection is done via network test that is unreliable and the * unreliable result should not be referred by successive calls. */ static int g_metadata_server_available = 0; -static gpr_mu g_state_mu; +static grpc_core::Mutex* g_state_mu; /* Protect a metadata_server_detector instance that can be modified by more than * one gRPC threads */ static gpr_mu* g_polling_mu; @@ -69,7 +69,9 @@ static gpr_once g_once = GPR_ONCE_INIT; static grpc_core::internal::grpc_gce_tenancy_checker g_gce_tenancy_checker = grpc_alts_is_running_on_gcp; -static void init_default_credentials(void) { gpr_mu_init(&g_state_mu); } +static void init_default_credentials(void) { + g_state_mu = new grpc_core::Mutex(); +} struct metadata_server_detector { grpc_polling_entity pollent; @@ -282,7 +284,7 @@ end: static void update_tenancy() { gpr_once_init(&g_once, init_default_credentials); - grpc_core::MutexLock lock(&g_state_mu); + grpc_core::MutexLock lock(g_state_mu); /* Try a platform-provided hint for GCE. */ if (!g_metadata_server_available) { @@ -297,7 +299,7 @@ static void update_tenancy() { } static bool metadata_server_available() { - grpc_core::MutexLock lock(&g_state_mu); + grpc_core::MutexLock lock(g_state_mu); return static_cast<bool>(g_metadata_server_available); } @@ -387,9 +389,8 @@ void set_gce_tenancy_checker_for_testing(grpc_gce_tenancy_checker checker) { void grpc_flush_cached_google_default_credentials(void) { grpc_core::ExecCtx exec_ctx; gpr_once_init(&g_once, init_default_credentials); - gpr_mu_lock(&g_state_mu); + grpc_core::MutexLock lock(g_state_mu); g_metadata_server_available = 0; - gpr_mu_unlock(&g_state_mu); } } // namespace internal diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index e7b98850132..9e0e4032aa5 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -84,7 +84,7 @@ class SecurityHandshaker : public Handshaker { tsi_handshaker* handshaker_; RefCountedPtr<grpc_security_connector> connector_; - gpr_mu mu_; + Mutex mu_; bool is_shutdown_ = false; // Endpoint and read buffer to destroy after a shutdown. @@ -120,14 +120,12 @@ SecurityHandshaker::SecurityHandshaker(tsi_handshaker* handshaker, max_frame_size_ = grpc_channel_arg_get_integer( arg, {0, 0, std::numeric_limits<int>::max()}); } - gpr_mu_init(&mu_); grpc_slice_buffer_init(&outgoing_); GRPC_CLOSURE_INIT(&on_peer_checked_, &SecurityHandshaker::OnPeerCheckedFn, this, grpc_schedule_on_exec_ctx); } SecurityHandshaker::~SecurityHandshaker() { - gpr_mu_destroy(&mu_); tsi_handshaker_destroy(handshaker_); tsi_handshaker_result_destroy(handshaker_result_); if (endpoint_to_destroy_ != nullptr) { diff --git a/src/core/lib/slice/slice_intern.cc b/src/core/lib/slice/slice_intern.cc index 6e03f48054f..ffb72db89bb 100644 --- a/src/core/lib/slice/slice_intern.cc +++ b/src/core/lib/slice/slice_intern.cc @@ -69,7 +69,7 @@ static bool g_forced_hash_seed = false; InternedSliceRefcount::~InternedSliceRefcount() { slice_shard* shard = &g_shards[SHARD_IDX(this->hash)]; - MutexLock lock(&shard->mu); + MutexLockForGprMu lock(&shard->mu); InternedSliceRefcount** prev_next; InternedSliceRefcount* cur; for (prev_next = &shard->strs[TABLE_IDX(this->hash, shard->capacity)], diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 0b2151b73b9..41866f12d75 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -63,16 +63,15 @@ extern void grpc_register_built_in_plugins(void); #define MAX_PLUGINS 128 static gpr_once g_basic_init = GPR_ONCE_INIT; -static gpr_mu g_init_mu; +static grpc_core::Mutex* g_init_mu; static int g_initializations; -static gpr_cv* g_shutting_down_cv; +static grpc_core::CondVar* g_shutting_down_cv; static bool g_shutting_down; static void do_basic_init(void) { gpr_log_verbosity_init(); - gpr_mu_init(&g_init_mu); - g_shutting_down_cv = static_cast<gpr_cv*>(malloc(sizeof(gpr_cv))); - gpr_cv_init(g_shutting_down_cv); + g_init_mu = new grpc_core::Mutex(); + g_shutting_down_cv = new grpc_core::CondVar(); g_shutting_down = false; grpc_register_built_in_plugins(); grpc_cq_global_init(); @@ -130,11 +129,11 @@ void grpc_init(void) { int i; gpr_once_init(&g_basic_init, do_basic_init); - grpc_core::MutexLock lock(&g_init_mu); + grpc_core::MutexLock lock(g_init_mu); if (++g_initializations == 1) { if (g_shutting_down) { g_shutting_down = false; - gpr_cv_broadcast(g_shutting_down_cv); + g_shutting_down_cv->SignalAll(); } grpc_core::Fork::GlobalInit(); grpc_fork_handlers_auto_register(); @@ -196,14 +195,14 @@ void grpc_shutdown_internal_locked(void) { grpc_core::ExecCtx::GlobalShutdown(); grpc_core::ApplicationCallbackExecCtx::GlobalShutdown(); g_shutting_down = false; - gpr_cv_broadcast(g_shutting_down_cv); + g_shutting_down_cv->SignalAll(); // Absolute last action will be to delete static metadata context. grpc_destroy_static_metadata_ctx(); } void grpc_shutdown_internal(void* /*ignored*/) { GRPC_API_TRACE("grpc_shutdown_internal", 0, ()); - grpc_core::MutexLock lock(&g_init_mu); + grpc_core::MutexLock lock(g_init_mu); // We have released lock from the shutdown thread and it is possible that // another grpc_init has been called, and do nothing if that is the case. if (--g_initializations != 0) { @@ -214,7 +213,7 @@ void grpc_shutdown_internal(void* /*ignored*/) { void grpc_shutdown(void) { GRPC_API_TRACE("grpc_shutdown(void)", 0, ()); - grpc_core::MutexLock lock(&g_init_mu); + grpc_core::MutexLock lock(g_init_mu); if (--g_initializations == 0) { grpc_core::ApplicationCallbackExecCtx* acec = @@ -243,7 +242,7 @@ void grpc_shutdown(void) { void grpc_shutdown_blocking(void) { GRPC_API_TRACE("grpc_shutdown_blocking(void)", 0, ()); - grpc_core::MutexLock lock(&g_init_mu); + grpc_core::MutexLock lock(g_init_mu); if (--g_initializations == 0) { g_shutting_down = true; grpc_shutdown_internal_locked(); @@ -253,16 +252,15 @@ void grpc_shutdown_blocking(void) { int grpc_is_initialized(void) { int r; gpr_once_init(&g_basic_init, do_basic_init); - grpc_core::MutexLock lock(&g_init_mu); + grpc_core::MutexLock lock(g_init_mu); r = g_initializations > 0; return r; } void grpc_maybe_wait_for_async_shutdown(void) { gpr_once_init(&g_basic_init, do_basic_init); - grpc_core::MutexLock lock(&g_init_mu); + grpc_core::MutexLock lock(g_init_mu); while (g_shutting_down) { - gpr_cv_wait(g_shutting_down_cv, &g_init_mu, - gpr_inf_future(GPR_CLOCK_REALTIME)); + g_shutting_down_cv->Wait(g_init_mu); } } diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 967f744cee4..cf57592825c 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -796,7 +796,7 @@ void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) { { // Wait for startup to be finished. Locks mu_global. MutexLock lock(&mu_global_); - starting_cv_.WaitUntil(&mu_global_, [this] { return !starting_; }); + WaitUntil(&starting_cv_, &mu_global_, [this] { return !starting_; }); // Stay locked, and gather up some stuff to do. GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (shutdown_published_) { diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc index aa5e58136e5..ef1e07d2cd0 100644 --- a/src/core/lib/transport/metadata.cc +++ b/src/core/lib/transport/metadata.cc @@ -596,7 +596,7 @@ static void* set_user_data(UserData* ud, void (*destroy_func)(void*), grpc_core::ReleasableMutexLock lock(&ud->mu_user_data); if (ud->destroy_user_data.Load(grpc_core::MemoryOrder::RELAXED)) { /* user data can only be set once */ - lock.Unlock(); + lock.Release(); if (destroy_func != nullptr) { destroy_func(data); } diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc index 845b23fe65b..041a46f4cf4 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc @@ -69,9 +69,9 @@ typedef struct alts_grpc_handshaker_client { grpc_closure on_handshaker_service_resp_recv; /* Buffers containing information to be sent (or received) to (or from) the * handshaker service. */ - grpc_byte_buffer* send_buffer; - grpc_byte_buffer* recv_buffer; - grpc_status_code status; + grpc_byte_buffer* send_buffer = nullptr; + grpc_byte_buffer* recv_buffer = nullptr; + grpc_status_code status = GRPC_STATUS_OK; /* Initial metadata to be received from handshaker service. */ grpc_metadata_array recv_initial_metadata; /* A callback function provided by an application to be invoked when response @@ -95,15 +95,15 @@ typedef struct alts_grpc_handshaker_client { /** callback for receiving handshake call status */ grpc_closure on_status_received; /** gRPC status code of handshake call */ - grpc_status_code handshake_status_code; + grpc_status_code handshake_status_code = GRPC_STATUS_OK; /** gRPC status details of handshake call */ grpc_slice handshake_status_details; /* mu synchronizes all fields below including their internal fields. */ - gpr_mu mu; + grpc_core::Mutex mu; /* indicates if the handshaker call's RECV_STATUS_ON_CLIENT op is done. */ - bool receive_status_finished; + bool receive_status_finished = false; /* if non-null, contains arguments to complete a TSI next callback. */ - recv_message_result* pending_recv_message_result; + recv_message_result* pending_recv_message_result = nullptr; /* Maximum frame size used by frame protector. */ size_t max_frame_size; } alts_grpc_handshaker_client; @@ -140,8 +140,7 @@ static void alts_grpc_handshaker_client_unref( grpc_alts_credentials_options_destroy(client->options); gpr_free(client->buffer); grpc_slice_unref_internal(client->handshake_status_details); - gpr_mu_destroy(&client->mu); - gpr_free(client); + delete client; } } @@ -695,24 +694,24 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( gpr_log(GPR_ERROR, "Invalid arguments to alts_handshaker_client_create()"); return nullptr; } - alts_grpc_handshaker_client* client = - static_cast<alts_grpc_handshaker_client*>(gpr_zalloc(sizeof(*client))); - gpr_mu_init(&client->mu); + alts_grpc_handshaker_client* client = new alts_grpc_handshaker_client(); + memset(&client->base, 0, sizeof(client->base)); + client->base.vtable = + vtable_for_testing == nullptr ? &vtable : vtable_for_testing; gpr_ref_init(&client->refs, 1); - client->grpc_caller = grpc_call_start_batch_and_execute; client->handshaker = handshaker; + client->grpc_caller = grpc_call_start_batch_and_execute; + grpc_metadata_array_init(&client->recv_initial_metadata); client->cb = cb; client->user_data = user_data; - client->send_buffer = nullptr; - client->recv_buffer = nullptr; client->options = grpc_alts_credentials_options_copy(options); client->target_name = grpc_slice_copy(target_name); - client->recv_bytes = grpc_empty_slice(); - grpc_metadata_array_init(&client->recv_initial_metadata); client->is_client = is_client; - client->max_frame_size = max_frame_size; + client->recv_bytes = grpc_empty_slice(); client->buffer_size = TSI_ALTS_INITIAL_BUFFER_SIZE; client->buffer = static_cast<unsigned char*>(gpr_zalloc(client->buffer_size)); + client->handshake_status_details = grpc_empty_slice(); + client->max_frame_size = max_frame_size; grpc_slice slice = grpc_slice_from_copied_string(handshaker_service_url); client->call = strcmp(handshaker_service_url, ALTS_HANDSHAKER_SERVICE_URL_FOR_TESTING) == @@ -722,8 +721,6 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( channel, nullptr, GRPC_PROPAGATE_DEFAULTS, interested_parties, grpc_slice_from_static_string(ALTS_SERVICE_METHOD), &slice, GRPC_MILLIS_INF_FUTURE, nullptr); - client->base.vtable = - vtable_for_testing == nullptr ? &vtable : vtable_for_testing; GRPC_CLOSURE_INIT(&client->on_handshaker_service_resp_recv, grpc_cb, client, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&client->on_status_received, on_status_received, client, diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc index 19f41fe63d0..60df0ce71f4 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc @@ -48,23 +48,23 @@ struct alts_tsi_handshaker { tsi_handshaker base; grpc_slice target_name; bool is_client; - bool has_sent_start_message; - bool has_created_handshaker_client; + bool has_sent_start_message = false; + bool has_created_handshaker_client = false; char* handshaker_service_url; grpc_pollset_set* interested_parties; grpc_alts_credentials_options* options; - alts_handshaker_client_vtable* client_vtable_for_testing; - grpc_channel* channel; + alts_handshaker_client_vtable* client_vtable_for_testing = nullptr; + grpc_channel* channel = nullptr; bool use_dedicated_cq; // mu synchronizes all fields below. Note these are the // only fields that can be concurrently accessed (due to // potential concurrency of tsi_handshaker_shutdown and // tsi_handshaker_next). - gpr_mu mu; - alts_handshaker_client* client; + grpc_core::Mutex mu; + alts_handshaker_client* client = nullptr; // shutdown effectively follows base.handshake_shutdown, // but is synchronized by the mutex of this object. - bool shutdown; + bool shutdown = false; // Maximum frame size used by frame protector. size_t max_frame_size; }; @@ -592,8 +592,7 @@ static void handshaker_destroy(tsi_handshaker* self) { grpc_channel_destroy_internal(handshaker->channel); } gpr_free(handshaker->handshaker_service_url); - gpr_mu_destroy(&handshaker->mu); - gpr_free(handshaker); + delete handshaker; } static const tsi_handshaker_vtable handshaker_vtable = { @@ -628,26 +627,22 @@ tsi_result alts_tsi_handshaker_create( gpr_log(GPR_ERROR, "Invalid arguments to alts_tsi_handshaker_create()"); return TSI_INVALID_ARGUMENT; } - alts_tsi_handshaker* handshaker = - static_cast<alts_tsi_handshaker*>(gpr_zalloc(sizeof(*handshaker))); - gpr_mu_init(&handshaker->mu); - handshaker->use_dedicated_cq = interested_parties == nullptr; - handshaker->client = nullptr; - handshaker->is_client = is_client; - handshaker->has_sent_start_message = false; + bool use_dedicated_cq = interested_parties == nullptr; + alts_tsi_handshaker* handshaker = new alts_tsi_handshaker(); + memset(&handshaker->base, 0, sizeof(handshaker->base)); + handshaker->base.vtable = + use_dedicated_cq ? &handshaker_vtable_dedicated : &handshaker_vtable; handshaker->target_name = target_name == nullptr ? grpc_empty_slice() : grpc_slice_from_static_string(target_name); - handshaker->interested_parties = interested_parties; - handshaker->has_created_handshaker_client = false; + handshaker->is_client = is_client; handshaker->handshaker_service_url = gpr_strdup(handshaker_service_url); + handshaker->interested_parties = interested_parties; handshaker->options = grpc_alts_credentials_options_copy(options); + handshaker->use_dedicated_cq = use_dedicated_cq; handshaker->max_frame_size = user_specified_max_frame_size != 0 ? user_specified_max_frame_size : kTsiAltsMaxFrameSize; - handshaker->base.vtable = handshaker->use_dedicated_cq - ? &handshaker_vtable_dedicated - : &handshaker_vtable; *self = &handshaker->base; return TSI_OK; } diff --git a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc index 6da4fb4ee1c..9254dcee5f6 100644 --- a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc +++ b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc @@ -84,7 +84,6 @@ class SslSessionLRUCache::Node { SslSessionLRUCache::SslSessionLRUCache(size_t capacity) : capacity_(capacity) { GPR_ASSERT(capacity > 0); - gpr_mu_init(&lock_); entry_by_key_ = grpc_avl_create(&cache_avl_vtable); } @@ -96,7 +95,6 @@ SslSessionLRUCache::~SslSessionLRUCache() { node = next; } grpc_avl_unref(entry_by_key_, nullptr); - gpr_mu_destroy(&lock_); } size_t SslSessionLRUCache::Size() { diff --git a/src/core/tsi/ssl/session_cache/ssl_session_cache.h b/src/core/tsi/ssl/session_cache/ssl_session_cache.h index e805ed6b14e..0aa3f1696c2 100644 --- a/src/core/tsi/ssl/session_cache/ssl_session_cache.h +++ b/src/core/tsi/ssl/session_cache/ssl_session_cache.h @@ -31,6 +31,7 @@ extern "C" { #include "src/core/lib/avl/avl.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/tsi/ssl/session_cache/ssl_session.h" /// Cache for SSL sessions for sessions resumption. @@ -76,7 +77,7 @@ class SslSessionLRUCache : public grpc_core::RefCounted<SslSessionLRUCache> { void PushFront(Node* node); void AssertInvariants(); - gpr_mu lock_; + grpc_core::Mutex lock_; size_t capacity_; Node* use_order_list_head_ = nullptr; diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc index 4131dc66110..e96dc4c4551 100644 --- a/src/cpp/server/dynamic_thread_pool.cc +++ b/src/cpp/server/dynamic_thread_pool.cc @@ -68,7 +68,7 @@ void DynamicThreadPool::ThreadFunc() { if (!callbacks_.empty()) { auto cb = callbacks_.front(); callbacks_.pop(); - lock.Unlock(); + lock.Release(); cb(); } else if (shutdown_) { break; @@ -97,7 +97,7 @@ void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) { DynamicThreadPool::~DynamicThreadPool() { grpc_core::MutexLock lock(&mu_); shutdown_ = true; - cv_.Broadcast(); + cv_.SignalAll(); while (nthreads_ != 0) { shutdown_cv_.Wait(&mu_); } diff --git a/src/cpp/server/load_reporter/load_reporter.cc b/src/cpp/server/load_reporter/load_reporter.cc index 1a8ad4a250e..613cf3a03bc 100644 --- a/src/cpp/server/load_reporter/load_reporter.cc +++ b/src/cpp/server/load_reporter/load_reporter.cc @@ -279,7 +279,7 @@ LoadReporter::GenerateLoadBalancingFeedback() { double cpu_limit = newest->cpu_limit - oldest->cpu_limit; std::chrono::duration<double> duration_seconds = newest->end_time - oldest->end_time; - lock.Unlock(); + lock.Release(); ::grpc::lb::v1::LoadBalancingFeedback feedback; feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit)); feedback.set_calls_per_second( diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc index b76cd5db755..e2a14efc501 100644 --- a/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc +++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc @@ -171,7 +171,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered( { grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.Unlock(); + lock.Release(); Shutdown(std::move(self), "OnRequestDelivered"); return; } @@ -229,7 +229,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone( { grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.Unlock(); + lock.Release(); Shutdown(std::move(self), "OnReadDone"); return; } @@ -261,7 +261,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport( { grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.Unlock(); + lock.Release(); Shutdown(std::move(self), "ScheduleNextReport"); return; } @@ -301,7 +301,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport( { grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.Unlock(); + lock.Release(); Shutdown(std::move(self), "SendReport"); return; } diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index c8560aa81dd..5155f610a8c 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -152,7 +152,7 @@ void ThreadManager::MainWorkLoop() { bool ok; WorkStatus work_status = PollForWork(&tag, &ok); - grpc_core::ReleasableMutexLock lock(&mu_); + grpc_core::LockableAndReleasableMutexLock lock(&mu_); // Reduce the number of pollers by 1 and check what happened with the poll num_pollers_--; bool done = false; @@ -179,7 +179,7 @@ void ThreadManager::MainWorkLoop() { max_active_threads_sofar_ = num_threads_; } // Drop lock before spawning thread to avoid contention - lock.Unlock(); + lock.Release(); WorkerThread* worker = new WorkerThread(this); if (worker->created()) { worker->Start(); @@ -195,17 +195,17 @@ void ThreadManager::MainWorkLoop() { // There is still at least some thread polling, so we can go on // even though we are below the number of pollers that we would // like to have (min_pollers_) - lock.Unlock(); + lock.Release(); } else { // There are no pollers to spare and we couldn't allocate // a new thread, so resources are exhausted! - lock.Unlock(); + lock.Release(); resource_exhausted = true; } } else { // There are a sufficient number of pollers available so we can do // the work and continue polling with our existing poller threads - lock.Unlock(); + lock.Release(); } // Lock is always released at this point - do the application work // or return resource exhausted if there is new work but we couldn't diff --git a/test/core/handshake/server_ssl_common.cc b/test/core/handshake/server_ssl_common.cc index aa9836713b5..e16fc0e2b88 100644 --- a/test/core/handshake/server_ssl_common.cc +++ b/test/core/handshake/server_ssl_common.cc @@ -86,7 +86,7 @@ class ServerInfo { void Await() { grpc_core::MutexLock lock(&mu_); - cv_.WaitUntil(&mu_, [this] { return ready_; }); + grpc_core::WaitUntil(&cv_, &mu_, [this] { return ready_; }); } private: diff --git a/test/core/iomgr/stranded_event_test.cc b/test/core/iomgr/stranded_event_test.cc index 43d082d6ccc..64f2c591f21 100644 --- a/test/core/iomgr/stranded_event_test.cc +++ b/test/core/iomgr/stranded_event_test.cc @@ -394,7 +394,7 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) { for (int i = 1; i <= kNumMessagePingPongsPerCall; i++) { { grpc_core::MutexLock lock(&ping_pong_round_mu); - ping_pong_round_cv.Broadcast(); + ping_pong_round_cv.SignalAll(); while (int(ping_pong_round) != i) { ping_pong_round_cv.Wait(&ping_pong_round_mu); } @@ -404,7 +404,7 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) { { grpc_core::MutexLock lock(&ping_pong_round_mu); ping_pongs_done++; - ping_pong_round_cv.Broadcast(); + ping_pong_round_cv.SignalAll(); } } gpr_log(GPR_DEBUG, "now receive status on call with server address:%s", @@ -425,7 +425,7 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) { ping_pong_round_cv.Wait(&ping_pong_round_mu); } ping_pong_round++; - ping_pong_round_cv.Broadcast(); + ping_pong_round_cv.SignalAll(); gpr_log(GPR_DEBUG, "initiate ping pong round: %ld", ping_pong_round); } } diff --git a/test/cpp/common/time_jump_test.cc b/test/cpp/common/time_jump_test.cc index cbf88df28aa..4836905bb9b 100644 --- a/test/cpp/common/time_jump_test.cc +++ b/test/cpp/common/time_jump_test.cc @@ -34,6 +34,8 @@ #include "src/core/lib/iomgr/timer_manager.h" #include "test/core/util/test_config.h" +#include "absl/time/time.h" + extern char** environ; #ifdef GPR_ANDROID @@ -116,8 +118,7 @@ TEST_P(TimeJumpTest, TimedWait) { run_cmd(cmd.str().c_str()); }); gpr_timespec before = gpr_now(GPR_CLOCK_MONOTONIC); - int timedout = cond.Wait( - &mu, grpc_millis_to_timespec(kWaitTimeMs, GPR_CLOCK_REALTIME)); + bool timedout = cond.WaitWithTimeout(&mu, absl::Milliseconds(kWaitTimeMs)); gpr_timespec after = gpr_now(GPR_CLOCK_MONOTONIC); int32_t elapsed_ms = gpr_time_to_millis(gpr_time_sub(after, before)); gpr_log(GPR_DEBUG, "After wait, timedout = %d elapsed_ms = %d", timedout, diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 150e6964b28..e1033f2fdb3 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -62,6 +62,7 @@ #include "src/core/lib/gpr/tmpfile.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/time_util.h" #include "src/core/lib/iomgr/load_file.h" #include "src/core/lib/iomgr/parse_address.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -640,7 +641,7 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> { void NotifyDoneWithAdsCallLocked() { if (!ads_done_) { ads_done_ = true; - ads_cond_.Broadcast(); + ads_cond_.SignalAll(); } } @@ -793,9 +794,10 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> { grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10); { grpc_core::MutexLock lock(&parent_->ads_mu_); - if (!parent_->ads_cond_.WaitUntil( - &parent_->ads_mu_, [this] { return parent_->ads_done_; }, - deadline)) { + if (!grpc_core::WaitUntilWithDeadline( + &parent_->ads_cond_, &parent_->ads_mu_, + [this] { return parent_->ads_done_; }, + grpc_core::ToAbslTime(deadline))) { break; } } @@ -1207,8 +1209,8 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> { grpc_core::CondVar cv; if (result_queue_.empty()) { load_report_cond_ = &cv; - load_report_cond_->WaitUntil(&load_report_mu_, - [this] { return !result_queue_.empty(); }); + grpc_core::WaitUntil(load_report_cond_, &load_report_mu_, + [this] { return !result_queue_.empty(); }); load_report_cond_ = nullptr; } std::vector<ClientStats> result = std::move(result_queue_.front()); @@ -1275,8 +1277,8 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> { } // Wait until notified done. grpc_core::MutexLock lock(&parent_->lrs_mu_); - parent_->lrs_cv_.WaitUntil(&parent_->lrs_mu_, - [this] { return parent_->lrs_done_; }); + grpc_core::WaitUntil(&parent_->lrs_cv_, &parent_->lrs_mu_, + [this] { return parent_->lrs_done_; }); } gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this); return Status::OK; @@ -1289,7 +1291,7 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> { void NotifyDoneWithLrsCallLocked() { if (!lrs_done_) { lrs_done_ = true; - lrs_cv_.Broadcast(); + lrs_cv_.SignalAll(); } }