Merge pull request #25404 from veblush/mutex-api2

Fix-forward: Made grpc_core::Mutex compatible to absl::Mutex
pull/25310/head
Esun Kim 4 years ago committed by GitHub
commit 8efa9f9142
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      src/core/ext/filters/client_channel/client_channel.cc
  2. 4
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  3. 4
      src/core/ext/filters/client_channel/subchannel.cc
  4. 67
      src/core/ext/filters/max_age/max_age_filter.cc
  5. 7
      src/core/lib/channel/handshaker.cc
  6. 2
      src/core/lib/channel/handshaker.h
  7. 4
      src/core/lib/gprpp/mpscq.cc
  8. 169
      src/core/lib/gprpp/sync.h
  9. 17
      src/core/lib/iomgr/ev_apple.cc
  10. 8
      src/core/lib/iomgr/ev_epollex_linux.cc
  11. 13
      src/core/lib/security/credentials/google_default/google_default_credentials.cc
  12. 4
      src/core/lib/security/transport/security_handshaker.cc
  13. 2
      src/core/lib/slice/slice_intern.cc
  14. 28
      src/core/lib/surface/init.cc
  15. 2
      src/core/lib/surface/server.cc
  16. 2
      src/core/lib/transport/metadata.cc
  17. 37
      src/core/tsi/alts/handshaker/alts_handshaker_client.cc
  18. 37
      src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
  19. 2
      src/core/tsi/ssl/session_cache/ssl_session_cache.cc
  20. 3
      src/core/tsi/ssl/session_cache/ssl_session_cache.h
  21. 4
      src/cpp/server/dynamic_thread_pool.cc
  22. 2
      src/cpp/server/load_reporter/load_reporter.cc
  23. 8
      src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
  24. 10
      src/cpp/thread_manager/thread_manager.cc
  25. 2
      test/core/handshake/server_ssl_common.cc
  26. 6
      test/core/iomgr/stranded_event_test.cc
  27. 5
      test/cpp/common/time_jump_test.cc
  28. 20
      test/cpp/end2end/xds_end2end_test.cc

@ -389,7 +389,7 @@ class ChannelData {
// Fields guarded by a mutex, since they need to be accessed
// synchronously via get_channel_info().
//
gpr_mu info_mu_;
Mutex info_mu_;
UniquePtr<char> info_lb_policy_name_;
UniquePtr<char> info_service_config_json_;
@ -1885,8 +1885,6 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
this, owning_stack_);
}
// Initialize data members.
gpr_mu_init(&info_mu_);
// Start backup polling.
grpc_client_channel_start_backup_polling(interested_parties_);
// Check client channel factory.
@ -1955,7 +1953,6 @@ ChannelData::~ChannelData() {
grpc_client_channel_stop_backup_polling(interested_parties_);
grpc_pollset_set_destroy(interested_parties_);
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
gpr_mu_destroy(&info_mu_);
}
RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(

@ -151,7 +151,7 @@ void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) {
// If the write failed or we're shutting down, clean up and invoke the
// callback with the error.
handshaker->HandshakeFailedLocked(GRPC_ERROR_REF(error));
lock.Unlock();
lock.Release();
handshaker->Unref();
} else {
// Otherwise, read the response.
@ -256,7 +256,7 @@ done:
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
handshaker->is_shutdown_ = true;
lock.Unlock();
lock.Release();
handshaker->Unref();
}

@ -1023,9 +1023,9 @@ void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) {
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
c->ContinueConnectingLocked();
lock.Unlock();
lock.Release();
} else {
lock.Unlock();
lock.Release();
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
}
GRPC_ERROR_UNREF(error);

@ -54,16 +54,16 @@ struct channel_data {
grpc_channel_stack* channel_stack;
/* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer
and max_age_grace_timer_pending */
gpr_mu max_age_timer_mu;
grpc_core::Mutex max_age_timer_mu;
/* True if the max_age timer callback is currently pending */
bool max_age_timer_pending;
bool max_age_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false;
/* True if the max_age_grace timer callback is currently pending */
bool max_age_grace_timer_pending;
bool max_age_grace_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false;
/* The timer for checking if the channel has reached its max age */
grpc_timer max_age_timer;
grpc_timer max_age_timer ABSL_GUARDED_BY(max_age_timer_mu);
/* The timer for checking if the max-aged channel has uesed up the grace
period */
grpc_timer max_age_grace_timer;
grpc_timer max_age_grace_timer ABSL_GUARDED_BY(max_age_timer_mu);
/* The timer for checking if the channel's idle duration reaches
max_connection_idle */
grpc_timer max_idle_timer;
@ -260,13 +260,15 @@ class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface {
static void start_max_age_timer_after_init(void* arg, grpc_error* /*error*/) {
channel_data* chand = static_cast<channel_data*>(arg);
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer");
grpc_timer_init(&chand->max_age_timer,
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age,
&chand->close_max_age_channel);
gpr_mu_unlock(&chand->max_age_timer_mu);
{
grpc_core::MutexLock lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer");
grpc_timer_init(
&chand->max_age_timer,
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age,
&chand->close_max_age_channel);
}
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch.reset(new grpc_core::ConnectivityWatcher(chand));
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
@ -278,16 +280,17 @@ static void start_max_age_timer_after_init(void* arg, grpc_error* /*error*/) {
static void start_max_age_grace_timer_after_goaway_op(void* arg,
grpc_error* /*error*/) {
channel_data* chand = static_cast<channel_data*>(arg);
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer");
grpc_timer_init(
&chand->max_age_grace_timer,
chand->max_connection_age_grace == GRPC_MILLIS_INF_FUTURE
? GRPC_MILLIS_INF_FUTURE
: grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age_grace,
&chand->force_close_max_age_channel);
gpr_mu_unlock(&chand->max_age_timer_mu);
{
grpc_core::MutexLock lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer");
grpc_timer_init(&chand->max_age_grace_timer,
chand->max_connection_age_grace == GRPC_MILLIS_INF_FUTURE
? GRPC_MILLIS_INF_FUTURE
: grpc_core::ExecCtx::Get()->Now() +
chand->max_connection_age_grace,
&chand->force_close_max_age_channel);
}
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack,
"max_age start_max_age_grace_timer_after_goaway_op");
}
@ -350,9 +353,10 @@ static void max_idle_timer_cb(void* arg, grpc_error* error) {
static void close_max_age_channel(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = false;
gpr_mu_unlock(&chand->max_age_timer_mu);
{
grpc_core::MutexLock lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = false;
}
if (error == GRPC_ERROR_NONE) {
GRPC_CHANNEL_STACK_REF(chand->channel_stack,
"max_age start_max_age_grace_timer_after_goaway_op");
@ -372,9 +376,10 @@ static void close_max_age_channel(void* arg, grpc_error* error) {
static void force_close_max_age_channel(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = false;
gpr_mu_unlock(&chand->max_age_timer_mu);
{
grpc_core::MutexLock lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = false;
}
if (error == GRPC_ERROR_NONE) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error =
@ -426,9 +431,7 @@ static void max_age_destroy_call_elem(
static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
gpr_mu_init(&chand->max_age_timer_mu);
chand->max_age_timer_pending = false;
chand->max_age_grace_timer_pending = false;
new (chand) channel_data();
chand->channel_stack = args->channel_stack;
chand->max_connection_age =
add_random_max_connection_age_jitter_and_convert_to_grpc_millis(
@ -513,7 +516,7 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem,
/* Destructor for channel_data. */
static void max_age_destroy_channel_elem(grpc_channel_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
gpr_mu_destroy(&chand->max_age_timer_mu);
chand->~channel_data();
}
const grpc_channel_filter grpc_max_age_filter = {

@ -53,7 +53,7 @@ std::string HandshakerArgsString(HandshakerArgs* args) {
} // namespace
HandshakeManager::HandshakeManager() { gpr_mu_init(&mu_); }
HandshakeManager::HandshakeManager() {}
/// Add \a mgr to the server side list of all pending handshake managers, the
/// list starts with \a *head.
@ -104,10 +104,7 @@ void HandshakeManager::Add(RefCountedPtr<Handshaker> handshaker) {
handshakers_.push_back(std::move(handshaker));
}
HandshakeManager::~HandshakeManager() {
handshakers_.clear();
gpr_mu_destroy(&mu_);
}
HandshakeManager::~HandshakeManager() { handshakers_.clear(); }
void HandshakeManager::Shutdown(grpc_error* why) {
{

@ -144,7 +144,7 @@ class HandshakeManager : public RefCounted<HandshakeManager> {
static const size_t HANDSHAKERS_INIT_SIZE = 2;
gpr_mu mu_;
Mutex mu_;
bool is_shutdown_ = false;
// An array of handshakers added via grpc_handshake_manager_add().
absl::InlinedVector<RefCountedPtr<Handshaker>, HANDSHAKERS_INIT_SIZE>

@ -86,9 +86,9 @@ bool LockedMultiProducerSingleConsumerQueue::Push(Node* node) {
LockedMultiProducerSingleConsumerQueue::Node*
LockedMultiProducerSingleConsumerQueue::TryPop() {
if (gpr_mu_trylock(mu_.get())) {
if (mu_.TryLock()) {
Node* node = queue_.Pop();
gpr_mu_unlock(mu_.get());
mu_.Unlock();
return node;
}
return nullptr;

@ -26,6 +26,9 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "absl/synchronization/mutex.h"
#include "src/core/lib/gprpp/time_util.h"
// The core library is not accessible in C++ codegen headers, and vice versa.
// Thus, we need to have duplicate headers with similar functionality.
// Make sure any change to this file is also reflected in
@ -37,7 +40,23 @@
namespace grpc_core {
class Mutex {
#ifdef GPR_ABSEIL_SYNC
using Mutex = absl::Mutex;
using MutexLock = absl::MutexLock;
using ReleasableMutexLock = absl::ReleasableMutexLock;
using CondVar = absl::CondVar;
// Returns the underlying gpr_mu from Mutex. This should be used only when
// it has to like passing the C++ mutex to C-core API.
// TODO(veblush): Remove this after C-core no longer uses gpr_mu.
inline gpr_mu* GetUnderlyingGprMu(Mutex* mutex) {
return reinterpret_cast<gpr_mu*>(mutex);
}
#else
class ABSL_LOCKABLE Mutex {
public:
Mutex() { gpr_mu_init(&mu_); }
~Mutex() { gpr_mu_destroy(&mu_); }
@ -45,52 +64,59 @@ class Mutex {
Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
gpr_mu* get() { return &mu_; }
const gpr_mu* get() const { return &mu_; }
void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION() { gpr_mu_lock(&mu_); }
void Unlock() ABSL_UNLOCK_FUNCTION() { gpr_mu_unlock(&mu_); }
bool TryLock() ABSL_EXCLUSIVE_TRYLOCK_FUNCTION(true) {
return gpr_mu_trylock(&mu_) != 0;
}
private:
gpr_mu mu_;
friend class CondVar;
friend gpr_mu* GetUnderlyingGprMu(Mutex* mutex);
};
// MutexLock is a std::
class MutexLock {
// Returns the underlying gpr_mu from Mutex. This should be used only when
// it has to like passing the C++ mutex to C-core API.
// TODO(veblush): Remove this after C-core no longer uses gpr_mu.
inline gpr_mu* GetUnderlyingGprMu(Mutex* mutex) { return &mutex->mu_; }
class ABSL_SCOPED_LOCKABLE MutexLock {
public:
explicit MutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); }
explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); }
~MutexLock() { gpr_mu_unlock(mu_); }
explicit MutexLock(Mutex* mu) ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) : mu_(mu) {
mu_->Lock();
}
~MutexLock() ABSL_UNLOCK_FUNCTION() { mu_->Unlock(); }
MutexLock(const MutexLock&) = delete;
MutexLock& operator=(const MutexLock&) = delete;
private:
gpr_mu* const mu_;
Mutex* const mu_;
};
class ReleasableMutexLock {
class ABSL_SCOPED_LOCKABLE ReleasableMutexLock {
public:
explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); }
explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); }
~ReleasableMutexLock() {
if (!released_) gpr_mu_unlock(mu_);
explicit ReleasableMutexLock(Mutex* mu) ABSL_EXCLUSIVE_LOCK_FUNCTION(mu)
: mu_(mu) {
mu_->Lock();
}
~ReleasableMutexLock() ABSL_UNLOCK_FUNCTION() {
if (!released_) mu_->Unlock();
}
ReleasableMutexLock(const ReleasableMutexLock&) = delete;
ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete;
void Lock() {
GPR_DEBUG_ASSERT(released_);
gpr_mu_lock(mu_);
released_ = false;
}
void Unlock() {
void Release() ABSL_UNLOCK_FUNCTION() {
GPR_DEBUG_ASSERT(!released_);
released_ = true;
gpr_mu_unlock(mu_);
mu_->Unlock();
}
private:
gpr_mu* const mu_;
Mutex* const mu_;
bool released_ = false;
};
@ -103,31 +129,94 @@ class CondVar {
CondVar& operator=(const CondVar&) = delete;
void Signal() { gpr_cv_signal(&cv_); }
void Broadcast() { gpr_cv_broadcast(&cv_); }
void SignalAll() { gpr_cv_broadcast(&cv_); }
void Wait(Mutex* mu) { WaitWithDeadline(mu, absl::InfiniteFuture()); }
bool WaitWithTimeout(Mutex* mu, absl::Duration timeout) {
return gpr_cv_wait(&cv_, &mu->mu_, ToGprTimeSpec(timeout)) != 0;
}
bool WaitWithDeadline(Mutex* mu, absl::Time deadline) {
return gpr_cv_wait(&cv_, &mu->mu_, ToGprTimeSpec(deadline)) != 0;
}
private:
gpr_cv cv_;
};
#endif // GPR_ABSEIL_SYNC
template <typename Predicate>
static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) {
while (!pred()) {
cv->Wait(mu);
}
}
// Returns true iff we timed-out
template <typename Predicate>
static bool WaitUntilWithTimeout(CondVar* cv, Mutex* mu, Predicate pred,
absl::Duration timeout) {
while (!pred()) {
if (cv->WaitWithTimeout(mu, timeout)) return true;
}
return false;
}
// Returns true iff we timed-out
template <typename Predicate>
static bool WaitUntilWithDeadline(CondVar* cv, Mutex* mu, Predicate pred,
absl::Time deadline) {
while (!pred()) {
if (cv->WaitWithDeadline(mu, deadline)) return true;
}
return false;
}
// Deprecated. Prefer MutexLock
class MutexLockForGprMu {
public:
explicit MutexLockForGprMu(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); }
~MutexLockForGprMu() { gpr_mu_unlock(mu_); }
MutexLockForGprMu(const MutexLock&) = delete;
MutexLockForGprMu& operator=(const MutexLock&) = delete;
private:
gpr_mu* const mu_;
};
int Wait(Mutex* mu) { return Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME)); }
int Wait(Mutex* mu, const gpr_timespec& deadline) {
return gpr_cv_wait(&cv_, mu->get(), deadline);
// Deprecated. Prefer MutexLock or ReleasableMutexLock
class ABSL_SCOPED_LOCKABLE LockableAndReleasableMutexLock {
public:
explicit LockableAndReleasableMutexLock(Mutex* mu)
ABSL_EXCLUSIVE_LOCK_FUNCTION(mu)
: mu_(mu) {
mu_->Lock();
}
~LockableAndReleasableMutexLock() ABSL_UNLOCK_FUNCTION() {
if (!released_) mu_->Unlock();
}
LockableAndReleasableMutexLock(const LockableAndReleasableMutexLock&) =
delete;
LockableAndReleasableMutexLock& operator=(
const LockableAndReleasableMutexLock&) = delete;
template <typename Predicate>
void WaitUntil(Mutex* mu, Predicate pred) {
while (!pred()) {
Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION() {
GPR_DEBUG_ASSERT(released_);
mu_->Lock();
released_ = false;
}
// Returns true iff we timed-out
template <typename Predicate>
bool WaitUntil(Mutex* mu, Predicate pred, const gpr_timespec& deadline) {
while (!pred()) {
if (Wait(mu, deadline)) return true;
}
return false;
void Release() ABSL_UNLOCK_FUNCTION() {
GPR_DEBUG_ASSERT(!released_);
released_ = true;
mu_->Unlock();
}
private:
gpr_cv cv_;
Mutex* const mu_;
bool released_ = false;
};
} // namespace grpc_core

@ -33,7 +33,10 @@
#include <list>
#include "absl/time/time.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time_util.h"
#include "src/core/lib/iomgr/ev_apple.h"
grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling");
@ -161,7 +164,7 @@ void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
/// Drive the run loop in a global singleton thread until the global run loop is
/// shutdown.
static void GlobalRunLoopFunc(void* arg) {
grpc_core::ReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
grpc_core::LockableAndReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent();
gGlobalRunLoopContext->init_cv.Signal();
@ -173,11 +176,11 @@ static void GlobalRunLoopFunc(void* arg) {
gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu);
}
gGlobalRunLoopContext->input_source_registered = false;
lock.Unlock();
lock.Release();
CFRunLoopRun();
lock.Lock();
}
lock.Unlock();
lock.Release();
}
// pollset implementation
@ -237,9 +240,9 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
auto it = apple_pollset->workers.begin();
while (!actual_worker.kicked && !apple_pollset->is_shutdown) {
if (actual_worker.cv.Wait(
&apple_pollset->mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
if (actual_worker.cv.WaitWithDeadline(
&apple_pollset->mu, grpc_core::ToAbslTime(grpc_millis_to_timespec(
deadline, GPR_CLOCK_REALTIME)))) {
// timed out
break;
}
@ -299,7 +302,7 @@ static grpc_error* pollset_kick(grpc_pollset* pollset,
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
GRPC_POLLING_TRACE("pollset init: %p", pollset);
GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset();
*mu = apple_pollset->mu.get();
*mu = grpc_core::GetUnderlyingGprMu(&apple_pollset->mu);
}
/// The caller must acquire the lock GrpcApplePollset.mu before calling this

@ -537,7 +537,7 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) {
const int epfd = pollset->active_pollable->epfd;
grpc_core::MutexLock lock(&fd->pollable_mu);
grpc_core::MutexLockForGprMu lock(&fd->pollable_mu);
for (size_t i = 0; i < fd->pollset_fds.size(); ++i) {
if (fd->pollset_fds[i] == epfd) {
return true;
@ -548,7 +548,7 @@ static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) {
static void fd_add_pollset(grpc_fd* fd, grpc_pollset* pollset) {
const int epfd = pollset->active_pollable->epfd;
grpc_core::MutexLock lock(&fd->pollable_mu);
grpc_core::MutexLockForGprMu lock(&fd->pollable_mu);
fd->pollset_fds.push_back(epfd);
}
@ -684,7 +684,7 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
GPR_TIMER_SCOPE("kick_one_worker", 0);
pollable* p = specific_worker->pollable_obj;
grpc_core::MutexLock lock(&p->mu);
grpc_core::MutexLockForGprMu lock(&p->mu);
GPR_ASSERT(specific_worker != nullptr);
if (specific_worker->kicked) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
@ -1296,7 +1296,7 @@ static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
return;
}
grpc_core::MutexLock lock(&pollset->mu);
grpc_core::MutexLockForGprMu lock(&pollset->mu);
grpc_error* error = pollset_add_fd_locked(pollset, fd);
// If we are in PO_MULTI mode, we should update the pollsets of the FD.

@ -61,7 +61,7 @@ using grpc_core::Json;
* means the detection is done via network test that is unreliable and the
* unreliable result should not be referred by successive calls. */
static int g_metadata_server_available = 0;
static gpr_mu g_state_mu;
static grpc_core::Mutex* g_state_mu;
/* Protect a metadata_server_detector instance that can be modified by more than
* one gRPC threads */
static gpr_mu* g_polling_mu;
@ -69,7 +69,9 @@ static gpr_once g_once = GPR_ONCE_INIT;
static grpc_core::internal::grpc_gce_tenancy_checker g_gce_tenancy_checker =
grpc_alts_is_running_on_gcp;
static void init_default_credentials(void) { gpr_mu_init(&g_state_mu); }
static void init_default_credentials(void) {
g_state_mu = new grpc_core::Mutex();
}
struct metadata_server_detector {
grpc_polling_entity pollent;
@ -282,7 +284,7 @@ end:
static void update_tenancy() {
gpr_once_init(&g_once, init_default_credentials);
grpc_core::MutexLock lock(&g_state_mu);
grpc_core::MutexLock lock(g_state_mu);
/* Try a platform-provided hint for GCE. */
if (!g_metadata_server_available) {
@ -297,7 +299,7 @@ static void update_tenancy() {
}
static bool metadata_server_available() {
grpc_core::MutexLock lock(&g_state_mu);
grpc_core::MutexLock lock(g_state_mu);
return static_cast<bool>(g_metadata_server_available);
}
@ -387,9 +389,8 @@ void set_gce_tenancy_checker_for_testing(grpc_gce_tenancy_checker checker) {
void grpc_flush_cached_google_default_credentials(void) {
grpc_core::ExecCtx exec_ctx;
gpr_once_init(&g_once, init_default_credentials);
gpr_mu_lock(&g_state_mu);
grpc_core::MutexLock lock(g_state_mu);
g_metadata_server_available = 0;
gpr_mu_unlock(&g_state_mu);
}
} // namespace internal

@ -84,7 +84,7 @@ class SecurityHandshaker : public Handshaker {
tsi_handshaker* handshaker_;
RefCountedPtr<grpc_security_connector> connector_;
gpr_mu mu_;
Mutex mu_;
bool is_shutdown_ = false;
// Endpoint and read buffer to destroy after a shutdown.
@ -120,14 +120,12 @@ SecurityHandshaker::SecurityHandshaker(tsi_handshaker* handshaker,
max_frame_size_ = grpc_channel_arg_get_integer(
arg, {0, 0, std::numeric_limits<int>::max()});
}
gpr_mu_init(&mu_);
grpc_slice_buffer_init(&outgoing_);
GRPC_CLOSURE_INIT(&on_peer_checked_, &SecurityHandshaker::OnPeerCheckedFn,
this, grpc_schedule_on_exec_ctx);
}
SecurityHandshaker::~SecurityHandshaker() {
gpr_mu_destroy(&mu_);
tsi_handshaker_destroy(handshaker_);
tsi_handshaker_result_destroy(handshaker_result_);
if (endpoint_to_destroy_ != nullptr) {

@ -69,7 +69,7 @@ static bool g_forced_hash_seed = false;
InternedSliceRefcount::~InternedSliceRefcount() {
slice_shard* shard = &g_shards[SHARD_IDX(this->hash)];
MutexLock lock(&shard->mu);
MutexLockForGprMu lock(&shard->mu);
InternedSliceRefcount** prev_next;
InternedSliceRefcount* cur;
for (prev_next = &shard->strs[TABLE_IDX(this->hash, shard->capacity)],

@ -63,16 +63,15 @@ extern void grpc_register_built_in_plugins(void);
#define MAX_PLUGINS 128
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
static grpc_core::Mutex* g_init_mu;
static int g_initializations;
static gpr_cv* g_shutting_down_cv;
static grpc_core::CondVar* g_shutting_down_cv;
static bool g_shutting_down;
static void do_basic_init(void) {
gpr_log_verbosity_init();
gpr_mu_init(&g_init_mu);
g_shutting_down_cv = static_cast<gpr_cv*>(malloc(sizeof(gpr_cv)));
gpr_cv_init(g_shutting_down_cv);
g_init_mu = new grpc_core::Mutex();
g_shutting_down_cv = new grpc_core::CondVar();
g_shutting_down = false;
grpc_register_built_in_plugins();
grpc_cq_global_init();
@ -130,11 +129,11 @@ void grpc_init(void) {
int i;
gpr_once_init(&g_basic_init, do_basic_init);
grpc_core::MutexLock lock(&g_init_mu);
grpc_core::MutexLock lock(g_init_mu);
if (++g_initializations == 1) {
if (g_shutting_down) {
g_shutting_down = false;
gpr_cv_broadcast(g_shutting_down_cv);
g_shutting_down_cv->SignalAll();
}
grpc_core::Fork::GlobalInit();
grpc_fork_handlers_auto_register();
@ -196,14 +195,14 @@ void grpc_shutdown_internal_locked(void) {
grpc_core::ExecCtx::GlobalShutdown();
grpc_core::ApplicationCallbackExecCtx::GlobalShutdown();
g_shutting_down = false;
gpr_cv_broadcast(g_shutting_down_cv);
g_shutting_down_cv->SignalAll();
// Absolute last action will be to delete static metadata context.
grpc_destroy_static_metadata_ctx();
}
void grpc_shutdown_internal(void* /*ignored*/) {
GRPC_API_TRACE("grpc_shutdown_internal", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
grpc_core::MutexLock lock(g_init_mu);
// We have released lock from the shutdown thread and it is possible that
// another grpc_init has been called, and do nothing if that is the case.
if (--g_initializations != 0) {
@ -214,7 +213,7 @@ void grpc_shutdown_internal(void* /*ignored*/) {
void grpc_shutdown(void) {
GRPC_API_TRACE("grpc_shutdown(void)", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
grpc_core::MutexLock lock(g_init_mu);
if (--g_initializations == 0) {
grpc_core::ApplicationCallbackExecCtx* acec =
@ -243,7 +242,7 @@ void grpc_shutdown(void) {
void grpc_shutdown_blocking(void) {
GRPC_API_TRACE("grpc_shutdown_blocking(void)", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
grpc_core::MutexLock lock(g_init_mu);
if (--g_initializations == 0) {
g_shutting_down = true;
grpc_shutdown_internal_locked();
@ -253,16 +252,15 @@ void grpc_shutdown_blocking(void) {
int grpc_is_initialized(void) {
int r;
gpr_once_init(&g_basic_init, do_basic_init);
grpc_core::MutexLock lock(&g_init_mu);
grpc_core::MutexLock lock(g_init_mu);
r = g_initializations > 0;
return r;
}
void grpc_maybe_wait_for_async_shutdown(void) {
gpr_once_init(&g_basic_init, do_basic_init);
grpc_core::MutexLock lock(&g_init_mu);
grpc_core::MutexLock lock(g_init_mu);
while (g_shutting_down) {
gpr_cv_wait(g_shutting_down_cv, &g_init_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
g_shutting_down_cv->Wait(g_init_mu);
}
}

@ -796,7 +796,7 @@ void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
{
// Wait for startup to be finished. Locks mu_global.
MutexLock lock(&mu_global_);
starting_cv_.WaitUntil(&mu_global_, [this] { return !starting_; });
WaitUntil(&starting_cv_, &mu_global_, [this] { return !starting_; });
// Stay locked, and gather up some stuff to do.
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (shutdown_published_) {

@ -596,7 +596,7 @@ static void* set_user_data(UserData* ud, void (*destroy_func)(void*),
grpc_core::ReleasableMutexLock lock(&ud->mu_user_data);
if (ud->destroy_user_data.Load(grpc_core::MemoryOrder::RELAXED)) {
/* user data can only be set once */
lock.Unlock();
lock.Release();
if (destroy_func != nullptr) {
destroy_func(data);
}

@ -69,9 +69,9 @@ typedef struct alts_grpc_handshaker_client {
grpc_closure on_handshaker_service_resp_recv;
/* Buffers containing information to be sent (or received) to (or from) the
* handshaker service. */
grpc_byte_buffer* send_buffer;
grpc_byte_buffer* recv_buffer;
grpc_status_code status;
grpc_byte_buffer* send_buffer = nullptr;
grpc_byte_buffer* recv_buffer = nullptr;
grpc_status_code status = GRPC_STATUS_OK;
/* Initial metadata to be received from handshaker service. */
grpc_metadata_array recv_initial_metadata;
/* A callback function provided by an application to be invoked when response
@ -95,15 +95,15 @@ typedef struct alts_grpc_handshaker_client {
/** callback for receiving handshake call status */
grpc_closure on_status_received;
/** gRPC status code of handshake call */
grpc_status_code handshake_status_code;
grpc_status_code handshake_status_code = GRPC_STATUS_OK;
/** gRPC status details of handshake call */
grpc_slice handshake_status_details;
/* mu synchronizes all fields below including their internal fields. */
gpr_mu mu;
grpc_core::Mutex mu;
/* indicates if the handshaker call's RECV_STATUS_ON_CLIENT op is done. */
bool receive_status_finished;
bool receive_status_finished = false;
/* if non-null, contains arguments to complete a TSI next callback. */
recv_message_result* pending_recv_message_result;
recv_message_result* pending_recv_message_result = nullptr;
/* Maximum frame size used by frame protector. */
size_t max_frame_size;
} alts_grpc_handshaker_client;
@ -140,8 +140,7 @@ static void alts_grpc_handshaker_client_unref(
grpc_alts_credentials_options_destroy(client->options);
gpr_free(client->buffer);
grpc_slice_unref_internal(client->handshake_status_details);
gpr_mu_destroy(&client->mu);
gpr_free(client);
delete client;
}
}
@ -695,24 +694,24 @@ alts_handshaker_client* alts_grpc_handshaker_client_create(
gpr_log(GPR_ERROR, "Invalid arguments to alts_handshaker_client_create()");
return nullptr;
}
alts_grpc_handshaker_client* client =
static_cast<alts_grpc_handshaker_client*>(gpr_zalloc(sizeof(*client)));
gpr_mu_init(&client->mu);
alts_grpc_handshaker_client* client = new alts_grpc_handshaker_client();
memset(&client->base, 0, sizeof(client->base));
client->base.vtable =
vtable_for_testing == nullptr ? &vtable : vtable_for_testing;
gpr_ref_init(&client->refs, 1);
client->grpc_caller = grpc_call_start_batch_and_execute;
client->handshaker = handshaker;
client->grpc_caller = grpc_call_start_batch_and_execute;
grpc_metadata_array_init(&client->recv_initial_metadata);
client->cb = cb;
client->user_data = user_data;
client->send_buffer = nullptr;
client->recv_buffer = nullptr;
client->options = grpc_alts_credentials_options_copy(options);
client->target_name = grpc_slice_copy(target_name);
client->recv_bytes = grpc_empty_slice();
grpc_metadata_array_init(&client->recv_initial_metadata);
client->is_client = is_client;
client->max_frame_size = max_frame_size;
client->recv_bytes = grpc_empty_slice();
client->buffer_size = TSI_ALTS_INITIAL_BUFFER_SIZE;
client->buffer = static_cast<unsigned char*>(gpr_zalloc(client->buffer_size));
client->handshake_status_details = grpc_empty_slice();
client->max_frame_size = max_frame_size;
grpc_slice slice = grpc_slice_from_copied_string(handshaker_service_url);
client->call =
strcmp(handshaker_service_url, ALTS_HANDSHAKER_SERVICE_URL_FOR_TESTING) ==
@ -722,8 +721,6 @@ alts_handshaker_client* alts_grpc_handshaker_client_create(
channel, nullptr, GRPC_PROPAGATE_DEFAULTS, interested_parties,
grpc_slice_from_static_string(ALTS_SERVICE_METHOD), &slice,
GRPC_MILLIS_INF_FUTURE, nullptr);
client->base.vtable =
vtable_for_testing == nullptr ? &vtable : vtable_for_testing;
GRPC_CLOSURE_INIT(&client->on_handshaker_service_resp_recv, grpc_cb, client,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&client->on_status_received, on_status_received, client,

@ -48,23 +48,23 @@ struct alts_tsi_handshaker {
tsi_handshaker base;
grpc_slice target_name;
bool is_client;
bool has_sent_start_message;
bool has_created_handshaker_client;
bool has_sent_start_message = false;
bool has_created_handshaker_client = false;
char* handshaker_service_url;
grpc_pollset_set* interested_parties;
grpc_alts_credentials_options* options;
alts_handshaker_client_vtable* client_vtable_for_testing;
grpc_channel* channel;
alts_handshaker_client_vtable* client_vtable_for_testing = nullptr;
grpc_channel* channel = nullptr;
bool use_dedicated_cq;
// mu synchronizes all fields below. Note these are the
// only fields that can be concurrently accessed (due to
// potential concurrency of tsi_handshaker_shutdown and
// tsi_handshaker_next).
gpr_mu mu;
alts_handshaker_client* client;
grpc_core::Mutex mu;
alts_handshaker_client* client = nullptr;
// shutdown effectively follows base.handshake_shutdown,
// but is synchronized by the mutex of this object.
bool shutdown;
bool shutdown = false;
// Maximum frame size used by frame protector.
size_t max_frame_size;
};
@ -592,8 +592,7 @@ static void handshaker_destroy(tsi_handshaker* self) {
grpc_channel_destroy_internal(handshaker->channel);
}
gpr_free(handshaker->handshaker_service_url);
gpr_mu_destroy(&handshaker->mu);
gpr_free(handshaker);
delete handshaker;
}
static const tsi_handshaker_vtable handshaker_vtable = {
@ -628,26 +627,22 @@ tsi_result alts_tsi_handshaker_create(
gpr_log(GPR_ERROR, "Invalid arguments to alts_tsi_handshaker_create()");
return TSI_INVALID_ARGUMENT;
}
alts_tsi_handshaker* handshaker =
static_cast<alts_tsi_handshaker*>(gpr_zalloc(sizeof(*handshaker)));
gpr_mu_init(&handshaker->mu);
handshaker->use_dedicated_cq = interested_parties == nullptr;
handshaker->client = nullptr;
handshaker->is_client = is_client;
handshaker->has_sent_start_message = false;
bool use_dedicated_cq = interested_parties == nullptr;
alts_tsi_handshaker* handshaker = new alts_tsi_handshaker();
memset(&handshaker->base, 0, sizeof(handshaker->base));
handshaker->base.vtable =
use_dedicated_cq ? &handshaker_vtable_dedicated : &handshaker_vtable;
handshaker->target_name = target_name == nullptr
? grpc_empty_slice()
: grpc_slice_from_static_string(target_name);
handshaker->interested_parties = interested_parties;
handshaker->has_created_handshaker_client = false;
handshaker->is_client = is_client;
handshaker->handshaker_service_url = gpr_strdup(handshaker_service_url);
handshaker->interested_parties = interested_parties;
handshaker->options = grpc_alts_credentials_options_copy(options);
handshaker->use_dedicated_cq = use_dedicated_cq;
handshaker->max_frame_size = user_specified_max_frame_size != 0
? user_specified_max_frame_size
: kTsiAltsMaxFrameSize;
handshaker->base.vtable = handshaker->use_dedicated_cq
? &handshaker_vtable_dedicated
: &handshaker_vtable;
*self = &handshaker->base;
return TSI_OK;
}

@ -84,7 +84,6 @@ class SslSessionLRUCache::Node {
SslSessionLRUCache::SslSessionLRUCache(size_t capacity) : capacity_(capacity) {
GPR_ASSERT(capacity > 0);
gpr_mu_init(&lock_);
entry_by_key_ = grpc_avl_create(&cache_avl_vtable);
}
@ -96,7 +95,6 @@ SslSessionLRUCache::~SslSessionLRUCache() {
node = next;
}
grpc_avl_unref(entry_by_key_, nullptr);
gpr_mu_destroy(&lock_);
}
size_t SslSessionLRUCache::Size() {

@ -31,6 +31,7 @@ extern "C" {
#include "src/core/lib/avl/avl.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/tsi/ssl/session_cache/ssl_session.h"
/// Cache for SSL sessions for sessions resumption.
@ -76,7 +77,7 @@ class SslSessionLRUCache : public grpc_core::RefCounted<SslSessionLRUCache> {
void PushFront(Node* node);
void AssertInvariants();
gpr_mu lock_;
grpc_core::Mutex lock_;
size_t capacity_;
Node* use_order_list_head_ = nullptr;

@ -68,7 +68,7 @@ void DynamicThreadPool::ThreadFunc() {
if (!callbacks_.empty()) {
auto cb = callbacks_.front();
callbacks_.pop();
lock.Unlock();
lock.Release();
cb();
} else if (shutdown_) {
break;
@ -97,7 +97,7 @@ void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
DynamicThreadPool::~DynamicThreadPool() {
grpc_core::MutexLock lock(&mu_);
shutdown_ = true;
cv_.Broadcast();
cv_.SignalAll();
while (nthreads_ != 0) {
shutdown_cv_.Wait(&mu_);
}

@ -279,7 +279,7 @@ LoadReporter::GenerateLoadBalancingFeedback() {
double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
std::chrono::duration<double> duration_seconds =
newest->end_time - oldest->end_time;
lock.Unlock();
lock.Release();
::grpc::lb::v1::LoadBalancingFeedback feedback;
feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
feedback.set_calls_per_second(

@ -171,7 +171,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
{
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.Unlock();
lock.Release();
Shutdown(std::move(self), "OnRequestDelivered");
return;
}
@ -229,7 +229,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
{
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.Unlock();
lock.Release();
Shutdown(std::move(self), "OnReadDone");
return;
}
@ -261,7 +261,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
{
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.Unlock();
lock.Release();
Shutdown(std::move(self), "ScheduleNextReport");
return;
}
@ -301,7 +301,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
{
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.Unlock();
lock.Release();
Shutdown(std::move(self), "SendReport");
return;
}

@ -152,7 +152,7 @@ void ThreadManager::MainWorkLoop() {
bool ok;
WorkStatus work_status = PollForWork(&tag, &ok);
grpc_core::ReleasableMutexLock lock(&mu_);
grpc_core::LockableAndReleasableMutexLock lock(&mu_);
// Reduce the number of pollers by 1 and check what happened with the poll
num_pollers_--;
bool done = false;
@ -179,7 +179,7 @@ void ThreadManager::MainWorkLoop() {
max_active_threads_sofar_ = num_threads_;
}
// Drop lock before spawning thread to avoid contention
lock.Unlock();
lock.Release();
WorkerThread* worker = new WorkerThread(this);
if (worker->created()) {
worker->Start();
@ -195,17 +195,17 @@ void ThreadManager::MainWorkLoop() {
// There is still at least some thread polling, so we can go on
// even though we are below the number of pollers that we would
// like to have (min_pollers_)
lock.Unlock();
lock.Release();
} else {
// There are no pollers to spare and we couldn't allocate
// a new thread, so resources are exhausted!
lock.Unlock();
lock.Release();
resource_exhausted = true;
}
} else {
// There are a sufficient number of pollers available so we can do
// the work and continue polling with our existing poller threads
lock.Unlock();
lock.Release();
}
// Lock is always released at this point - do the application work
// or return resource exhausted if there is new work but we couldn't

@ -86,7 +86,7 @@ class ServerInfo {
void Await() {
grpc_core::MutexLock lock(&mu_);
cv_.WaitUntil(&mu_, [this] { return ready_; });
grpc_core::WaitUntil(&cv_, &mu_, [this] { return ready_; });
}
private:

@ -394,7 +394,7 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) {
for (int i = 1; i <= kNumMessagePingPongsPerCall; i++) {
{
grpc_core::MutexLock lock(&ping_pong_round_mu);
ping_pong_round_cv.Broadcast();
ping_pong_round_cv.SignalAll();
while (int(ping_pong_round) != i) {
ping_pong_round_cv.Wait(&ping_pong_round_mu);
}
@ -404,7 +404,7 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) {
{
grpc_core::MutexLock lock(&ping_pong_round_mu);
ping_pongs_done++;
ping_pong_round_cv.Broadcast();
ping_pong_round_cv.SignalAll();
}
}
gpr_log(GPR_DEBUG, "now receive status on call with server address:%s",
@ -425,7 +425,7 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) {
ping_pong_round_cv.Wait(&ping_pong_round_mu);
}
ping_pong_round++;
ping_pong_round_cv.Broadcast();
ping_pong_round_cv.SignalAll();
gpr_log(GPR_DEBUG, "initiate ping pong round: %ld", ping_pong_round);
}
}

@ -34,6 +34,8 @@
#include "src/core/lib/iomgr/timer_manager.h"
#include "test/core/util/test_config.h"
#include "absl/time/time.h"
extern char** environ;
#ifdef GPR_ANDROID
@ -116,8 +118,7 @@ TEST_P(TimeJumpTest, TimedWait) {
run_cmd(cmd.str().c_str());
});
gpr_timespec before = gpr_now(GPR_CLOCK_MONOTONIC);
int timedout = cond.Wait(
&mu, grpc_millis_to_timespec(kWaitTimeMs, GPR_CLOCK_REALTIME));
bool timedout = cond.WaitWithTimeout(&mu, absl::Milliseconds(kWaitTimeMs));
gpr_timespec after = gpr_now(GPR_CLOCK_MONOTONIC);
int32_t elapsed_ms = gpr_time_to_millis(gpr_time_sub(after, before));
gpr_log(GPR_DEBUG, "After wait, timedout = %d elapsed_ms = %d", timedout,

@ -62,6 +62,7 @@
#include "src/core/lib/gpr/tmpfile.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time_util.h"
#include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/iomgr/parse_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
@ -640,7 +641,7 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
void NotifyDoneWithAdsCallLocked() {
if (!ads_done_) {
ads_done_ = true;
ads_cond_.Broadcast();
ads_cond_.SignalAll();
}
}
@ -793,9 +794,10 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10);
{
grpc_core::MutexLock lock(&parent_->ads_mu_);
if (!parent_->ads_cond_.WaitUntil(
&parent_->ads_mu_, [this] { return parent_->ads_done_; },
deadline)) {
if (!grpc_core::WaitUntilWithDeadline(
&parent_->ads_cond_, &parent_->ads_mu_,
[this] { return parent_->ads_done_; },
grpc_core::ToAbslTime(deadline))) {
break;
}
}
@ -1207,8 +1209,8 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
grpc_core::CondVar cv;
if (result_queue_.empty()) {
load_report_cond_ = &cv;
load_report_cond_->WaitUntil(&load_report_mu_,
[this] { return !result_queue_.empty(); });
grpc_core::WaitUntil(load_report_cond_, &load_report_mu_,
[this] { return !result_queue_.empty(); });
load_report_cond_ = nullptr;
}
std::vector<ClientStats> result = std::move(result_queue_.front());
@ -1275,8 +1277,8 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
}
// Wait until notified done.
grpc_core::MutexLock lock(&parent_->lrs_mu_);
parent_->lrs_cv_.WaitUntil(&parent_->lrs_mu_,
[this] { return parent_->lrs_done_; });
grpc_core::WaitUntil(&parent_->lrs_cv_, &parent_->lrs_mu_,
[this] { return parent_->lrs_done_; });
}
gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this);
return Status::OK;
@ -1289,7 +1291,7 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
void NotifyDoneWithLrsCallLocked() {
if (!lrs_done_) {
lrs_done_ = true;
lrs_cv_.Broadcast();
lrs_cv_.SignalAll();
}
}

Loading…
Cancel
Save