Revert "Made grpc_core::Mutex compatible to absl::Mutex" (#25537)

* Revert "Made grpc_core::Mutex compatible to absl::Mutex"

This reverts commit 0b53341328.

* Revert "Added thread annotation (#25486)"

This reverts commit 9d897cb1a5.

* Revert "Fix a race in resolve_address and resolve_address_posix tests"

This reverts commit 6d8e7d3819.

* Restore 25398

* restore 25398
pull/25531/head
donnadionne 4 years ago committed by GitHub
parent 909cac3866
commit 8e268dcec8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      src/core/ext/filters/client_channel/client_channel.cc
  2. 4
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  3. 4
      src/core/ext/filters/client_channel/subchannel.cc
  4. 67
      src/core/ext/filters/max_age/max_age_filter.cc
  5. 7
      src/core/lib/channel/handshaker.cc
  6. 2
      src/core/lib/channel/handshaker.h
  7. 4
      src/core/lib/gprpp/mpscq.cc
  8. 169
      src/core/lib/gprpp/sync.h
  9. 17
      src/core/lib/iomgr/ev_apple.cc
  10. 8
      src/core/lib/iomgr/ev_epollex_linux.cc
  11. 13
      src/core/lib/security/credentials/google_default/google_default_credentials.cc
  12. 4
      src/core/lib/security/transport/security_handshaker.cc
  13. 2
      src/core/lib/slice/slice_intern.cc
  14. 6
      src/core/lib/surface/channel.h
  15. 28
      src/core/lib/surface/init.cc
  16. 2
      src/core/lib/surface/server.cc
  17. 2
      src/core/lib/transport/metadata.cc
  18. 37
      src/core/tsi/alts/handshaker/alts_handshaker_client.cc
  19. 37
      src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
  20. 2
      src/core/tsi/ssl/session_cache/ssl_session_cache.cc
  21. 3
      src/core/tsi/ssl/session_cache/ssl_session_cache.h
  22. 19
      src/cpp/common/completion_queue_cc.cc
  23. 4
      src/cpp/server/dynamic_thread_pool.cc
  24. 2
      src/cpp/server/load_reporter/load_reporter.cc
  25. 8
      src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
  26. 10
      src/cpp/thread_manager/thread_manager.cc
  27. 2
      test/core/handshake/server_ssl_common.cc
  28. 6
      test/core/iomgr/resolve_address_posix_test.cc
  29. 10
      test/core/iomgr/resolve_address_test.cc
  30. 6
      test/core/iomgr/stranded_event_test.cc
  31. 5
      test/cpp/common/time_jump_test.cc
  32. 20
      test/cpp/end2end/xds_end2end_test.cc

@ -389,7 +389,7 @@ class ChannelData {
// Fields guarded by a mutex, since they need to be accessed
// synchronously via get_channel_info().
//
Mutex info_mu_;
gpr_mu info_mu_;
UniquePtr<char> info_lb_policy_name_;
UniquePtr<char> info_service_config_json_;
@ -1885,6 +1885,8 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
this, owning_stack_);
}
// Initialize data members.
gpr_mu_init(&info_mu_);
// Start backup polling.
grpc_client_channel_start_backup_polling(interested_parties_);
// Check client channel factory.
@ -1953,6 +1955,7 @@ ChannelData::~ChannelData() {
grpc_client_channel_stop_backup_polling(interested_parties_);
grpc_pollset_set_destroy(interested_parties_);
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
gpr_mu_destroy(&info_mu_);
}
RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(

@ -151,7 +151,7 @@ void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) {
// If the write failed or we're shutting down, clean up and invoke the
// callback with the error.
handshaker->HandshakeFailedLocked(GRPC_ERROR_REF(error));
lock.Release();
lock.Unlock();
handshaker->Unref();
} else {
// Otherwise, read the response.
@ -256,7 +256,7 @@ done:
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
handshaker->is_shutdown_ = true;
lock.Release();
lock.Unlock();
handshaker->Unref();
}

@ -1023,9 +1023,9 @@ void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) {
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
c->ContinueConnectingLocked();
lock.Release();
lock.Unlock();
} else {
lock.Release();
lock.Unlock();
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
}
GRPC_ERROR_UNREF(error);

@ -54,16 +54,16 @@ struct channel_data {
grpc_channel_stack* channel_stack;
/* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer
and max_age_grace_timer_pending */
grpc_core::Mutex max_age_timer_mu;
gpr_mu max_age_timer_mu;
/* True if the max_age timer callback is currently pending */
bool max_age_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false;
bool max_age_timer_pending;
/* True if the max_age_grace timer callback is currently pending */
bool max_age_grace_timer_pending ABSL_GUARDED_BY(max_age_timer_mu) = false;
bool max_age_grace_timer_pending;
/* The timer for checking if the channel has reached its max age */
grpc_timer max_age_timer ABSL_GUARDED_BY(max_age_timer_mu);
grpc_timer max_age_timer;
/* The timer for checking if the max-aged channel has uesed up the grace
period */
grpc_timer max_age_grace_timer ABSL_GUARDED_BY(max_age_timer_mu);
grpc_timer max_age_grace_timer;
/* The timer for checking if the channel's idle duration reaches
max_connection_idle */
grpc_timer max_idle_timer;
@ -260,15 +260,13 @@ class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface {
static void start_max_age_timer_after_init(void* arg, grpc_error* /*error*/) {
channel_data* chand = static_cast<channel_data*>(arg);
{
grpc_core::MutexLock lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer");
grpc_timer_init(
&chand->max_age_timer,
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age,
&chand->close_max_age_channel);
}
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer");
grpc_timer_init(&chand->max_age_timer,
grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age,
&chand->close_max_age_channel);
gpr_mu_unlock(&chand->max_age_timer_mu);
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch.reset(new grpc_core::ConnectivityWatcher(chand));
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
@ -280,17 +278,16 @@ static void start_max_age_timer_after_init(void* arg, grpc_error* /*error*/) {
static void start_max_age_grace_timer_after_goaway_op(void* arg,
grpc_error* /*error*/) {
channel_data* chand = static_cast<channel_data*>(arg);
{
grpc_core::MutexLock lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer");
grpc_timer_init(&chand->max_age_grace_timer,
chand->max_connection_age_grace == GRPC_MILLIS_INF_FUTURE
? GRPC_MILLIS_INF_FUTURE
: grpc_core::ExecCtx::Get()->Now() +
chand->max_connection_age_grace,
&chand->force_close_max_age_channel);
}
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer");
grpc_timer_init(
&chand->max_age_grace_timer,
chand->max_connection_age_grace == GRPC_MILLIS_INF_FUTURE
? GRPC_MILLIS_INF_FUTURE
: grpc_core::ExecCtx::Get()->Now() + chand->max_connection_age_grace,
&chand->force_close_max_age_channel);
gpr_mu_unlock(&chand->max_age_timer_mu);
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack,
"max_age start_max_age_grace_timer_after_goaway_op");
}
@ -353,10 +350,9 @@ static void max_idle_timer_cb(void* arg, grpc_error* error) {
static void close_max_age_channel(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
{
grpc_core::MutexLock lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = false;
}
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = false;
gpr_mu_unlock(&chand->max_age_timer_mu);
if (error == GRPC_ERROR_NONE) {
GRPC_CHANNEL_STACK_REF(chand->channel_stack,
"max_age start_max_age_grace_timer_after_goaway_op");
@ -376,10 +372,9 @@ static void close_max_age_channel(void* arg, grpc_error* error) {
static void force_close_max_age_channel(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
{
grpc_core::MutexLock lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = false;
}
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = false;
gpr_mu_unlock(&chand->max_age_timer_mu);
if (error == GRPC_ERROR_NONE) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error =
@ -431,7 +426,9 @@ static void max_age_destroy_call_elem(
static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
new (chand) channel_data();
gpr_mu_init(&chand->max_age_timer_mu);
chand->max_age_timer_pending = false;
chand->max_age_grace_timer_pending = false;
chand->channel_stack = args->channel_stack;
chand->max_connection_age =
add_random_max_connection_age_jitter_and_convert_to_grpc_millis(
@ -516,7 +513,7 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem,
/* Destructor for channel_data. */
static void max_age_destroy_channel_elem(grpc_channel_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
chand->~channel_data();
gpr_mu_destroy(&chand->max_age_timer_mu);
}
const grpc_channel_filter grpc_max_age_filter = {

@ -53,7 +53,7 @@ std::string HandshakerArgsString(HandshakerArgs* args) {
} // namespace
HandshakeManager::HandshakeManager() {}
HandshakeManager::HandshakeManager() { gpr_mu_init(&mu_); }
/// Add \a mgr to the server side list of all pending handshake managers, the
/// list starts with \a *head.
@ -104,7 +104,10 @@ void HandshakeManager::Add(RefCountedPtr<Handshaker> handshaker) {
handshakers_.push_back(std::move(handshaker));
}
HandshakeManager::~HandshakeManager() { handshakers_.clear(); }
HandshakeManager::~HandshakeManager() {
handshakers_.clear();
gpr_mu_destroy(&mu_);
}
void HandshakeManager::Shutdown(grpc_error* why) {
{

@ -144,7 +144,7 @@ class HandshakeManager : public RefCounted<HandshakeManager> {
static const size_t HANDSHAKERS_INIT_SIZE = 2;
Mutex mu_;
gpr_mu mu_;
bool is_shutdown_ = false;
// An array of handshakers added via grpc_handshake_manager_add().
absl::InlinedVector<RefCountedPtr<Handshaker>, HANDSHAKERS_INIT_SIZE>

@ -86,9 +86,9 @@ bool LockedMultiProducerSingleConsumerQueue::Push(Node* node) {
LockedMultiProducerSingleConsumerQueue::Node*
LockedMultiProducerSingleConsumerQueue::TryPop() {
if (mu_.TryLock()) {
if (gpr_mu_trylock(mu_.get())) {
Node* node = queue_.Pop();
mu_.Unlock();
gpr_mu_unlock(mu_.get());
return node;
}
return nullptr;

@ -26,9 +26,6 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "absl/synchronization/mutex.h"
#include "src/core/lib/gprpp/time_util.h"
// The core library is not accessible in C++ codegen headers, and vice versa.
// Thus, we need to have duplicate headers with similar functionality.
// Make sure any change to this file is also reflected in
@ -40,23 +37,7 @@
namespace grpc_core {
#ifdef GPR_ABSEIL_SYNC
using Mutex = absl::Mutex;
using MutexLock = absl::MutexLock;
using ReleasableMutexLock = absl::ReleasableMutexLock;
using CondVar = absl::CondVar;
// Returns the underlying gpr_mu from Mutex. This should be used only when
// it has to like passing the C++ mutex to C-core API.
// TODO(veblush): Remove this after C-core no longer uses gpr_mu.
inline gpr_mu* GetUnderlyingGprMu(Mutex* mutex) {
return reinterpret_cast<gpr_mu*>(mutex);
}
#else
class ABSL_LOCKABLE Mutex {
class Mutex {
public:
Mutex() { gpr_mu_init(&mu_); }
~Mutex() { gpr_mu_destroy(&mu_); }
@ -64,59 +45,52 @@ class ABSL_LOCKABLE Mutex {
Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION() { gpr_mu_lock(&mu_); }
void Unlock() ABSL_UNLOCK_FUNCTION() { gpr_mu_unlock(&mu_); }
bool TryLock() ABSL_EXCLUSIVE_TRYLOCK_FUNCTION(true) {
return gpr_mu_trylock(&mu_) != 0;
}
gpr_mu* get() { return &mu_; }
const gpr_mu* get() const { return &mu_; }
private:
gpr_mu mu_;
friend class CondVar;
friend gpr_mu* GetUnderlyingGprMu(Mutex* mutex);
};
// Returns the underlying gpr_mu from Mutex. This should be used only when
// it has to like passing the C++ mutex to C-core API.
// TODO(veblush): Remove this after C-core no longer uses gpr_mu.
inline gpr_mu* GetUnderlyingGprMu(Mutex* mutex) { return &mutex->mu_; }
class ABSL_SCOPED_LOCKABLE MutexLock {
// MutexLock is a std::
class MutexLock {
public:
explicit MutexLock(Mutex* mu) ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) : mu_(mu) {
mu_->Lock();
}
~MutexLock() ABSL_UNLOCK_FUNCTION() { mu_->Unlock(); }
explicit MutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); }
explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); }
~MutexLock() { gpr_mu_unlock(mu_); }
MutexLock(const MutexLock&) = delete;
MutexLock& operator=(const MutexLock&) = delete;
private:
Mutex* const mu_;
gpr_mu* const mu_;
};
class ABSL_SCOPED_LOCKABLE ReleasableMutexLock {
class ReleasableMutexLock {
public:
explicit ReleasableMutexLock(Mutex* mu) ABSL_EXCLUSIVE_LOCK_FUNCTION(mu)
: mu_(mu) {
mu_->Lock();
}
~ReleasableMutexLock() ABSL_UNLOCK_FUNCTION() {
if (!released_) mu_->Unlock();
explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); }
explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); }
~ReleasableMutexLock() {
if (!released_) gpr_mu_unlock(mu_);
}
ReleasableMutexLock(const ReleasableMutexLock&) = delete;
ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete;
void Release() ABSL_UNLOCK_FUNCTION() {
void Lock() {
GPR_DEBUG_ASSERT(released_);
gpr_mu_lock(mu_);
released_ = false;
}
void Unlock() {
GPR_DEBUG_ASSERT(!released_);
released_ = true;
mu_->Unlock();
gpr_mu_unlock(mu_);
}
private:
Mutex* const mu_;
gpr_mu* const mu_;
bool released_ = false;
};
@ -129,94 +103,31 @@ class CondVar {
CondVar& operator=(const CondVar&) = delete;
void Signal() { gpr_cv_signal(&cv_); }
void SignalAll() { gpr_cv_broadcast(&cv_); }
void Wait(Mutex* mu) { WaitWithDeadline(mu, absl::InfiniteFuture()); }
bool WaitWithTimeout(Mutex* mu, absl::Duration timeout) {
return gpr_cv_wait(&cv_, &mu->mu_, ToGprTimeSpec(timeout)) != 0;
}
bool WaitWithDeadline(Mutex* mu, absl::Time deadline) {
return gpr_cv_wait(&cv_, &mu->mu_, ToGprTimeSpec(deadline)) != 0;
}
private:
gpr_cv cv_;
};
#endif // GPR_ABSEIL_SYNC
template <typename Predicate>
static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) {
while (!pred()) {
cv->Wait(mu);
}
}
// Returns true iff we timed-out
template <typename Predicate>
static bool WaitUntilWithTimeout(CondVar* cv, Mutex* mu, Predicate pred,
absl::Duration timeout) {
while (!pred()) {
if (cv->WaitWithTimeout(mu, timeout)) return true;
}
return false;
}
// Returns true iff we timed-out
template <typename Predicate>
static bool WaitUntilWithDeadline(CondVar* cv, Mutex* mu, Predicate pred,
absl::Time deadline) {
while (!pred()) {
if (cv->WaitWithDeadline(mu, deadline)) return true;
}
return false;
}
// Deprecated. Prefer MutexLock
class MutexLockForGprMu {
public:
explicit MutexLockForGprMu(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); }
~MutexLockForGprMu() { gpr_mu_unlock(mu_); }
MutexLockForGprMu(const MutexLock&) = delete;
MutexLockForGprMu& operator=(const MutexLock&) = delete;
private:
gpr_mu* const mu_;
};
void Broadcast() { gpr_cv_broadcast(&cv_); }
// Deprecated. Prefer MutexLock or ReleasableMutexLock
class ABSL_SCOPED_LOCKABLE LockableAndReleasableMutexLock {
public:
explicit LockableAndReleasableMutexLock(Mutex* mu)
ABSL_EXCLUSIVE_LOCK_FUNCTION(mu)
: mu_(mu) {
mu_->Lock();
int Wait(Mutex* mu) { return Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME)); }
int Wait(Mutex* mu, const gpr_timespec& deadline) {
return gpr_cv_wait(&cv_, mu->get(), deadline);
}
~LockableAndReleasableMutexLock() ABSL_UNLOCK_FUNCTION() {
if (!released_) mu_->Unlock();
}
LockableAndReleasableMutexLock(const LockableAndReleasableMutexLock&) =
delete;
LockableAndReleasableMutexLock& operator=(
const LockableAndReleasableMutexLock&) = delete;
void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION() {
GPR_DEBUG_ASSERT(released_);
mu_->Lock();
released_ = false;
template <typename Predicate>
void WaitUntil(Mutex* mu, Predicate pred) {
while (!pred()) {
Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
}
void Release() ABSL_UNLOCK_FUNCTION() {
GPR_DEBUG_ASSERT(!released_);
released_ = true;
mu_->Unlock();
// Returns true iff we timed-out
template <typename Predicate>
bool WaitUntil(Mutex* mu, Predicate pred, const gpr_timespec& deadline) {
while (!pred()) {
if (Wait(mu, deadline)) return true;
}
return false;
}
private:
Mutex* const mu_;
bool released_ = false;
gpr_cv cv_;
};
} // namespace grpc_core

@ -33,10 +33,7 @@
#include <list>
#include "absl/time/time.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time_util.h"
#include "src/core/lib/iomgr/ev_apple.h"
grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling");
@ -164,7 +161,7 @@ void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
/// Drive the run loop in a global singleton thread until the global run loop is
/// shutdown.
static void GlobalRunLoopFunc(void* arg) {
grpc_core::LockableAndReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
grpc_core::ReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent();
gGlobalRunLoopContext->init_cv.Signal();
@ -176,11 +173,11 @@ static void GlobalRunLoopFunc(void* arg) {
gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu);
}
gGlobalRunLoopContext->input_source_registered = false;
lock.Release();
lock.Unlock();
CFRunLoopRun();
lock.Lock();
}
lock.Release();
lock.Unlock();
}
// pollset implementation
@ -240,9 +237,9 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
auto it = apple_pollset->workers.begin();
while (!actual_worker.kicked && !apple_pollset->is_shutdown) {
if (actual_worker.cv.WaitWithDeadline(
&apple_pollset->mu, grpc_core::ToAbslTime(grpc_millis_to_timespec(
deadline, GPR_CLOCK_REALTIME)))) {
if (actual_worker.cv.Wait(
&apple_pollset->mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
// timed out
break;
}
@ -302,7 +299,7 @@ static grpc_error* pollset_kick(grpc_pollset* pollset,
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
GRPC_POLLING_TRACE("pollset init: %p", pollset);
GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset();
*mu = grpc_core::GetUnderlyingGprMu(&apple_pollset->mu);
*mu = apple_pollset->mu.get();
}
/// The caller must acquire the lock GrpcApplePollset.mu before calling this

@ -537,7 +537,7 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) {
const int epfd = pollset->active_pollable->epfd;
grpc_core::MutexLockForGprMu lock(&fd->pollable_mu);
grpc_core::MutexLock lock(&fd->pollable_mu);
for (size_t i = 0; i < fd->pollset_fds.size(); ++i) {
if (fd->pollset_fds[i] == epfd) {
return true;
@ -548,7 +548,7 @@ static bool fd_has_pollset(grpc_fd* fd, grpc_pollset* pollset) {
static void fd_add_pollset(grpc_fd* fd, grpc_pollset* pollset) {
const int epfd = pollset->active_pollable->epfd;
grpc_core::MutexLockForGprMu lock(&fd->pollable_mu);
grpc_core::MutexLock lock(&fd->pollable_mu);
fd->pollset_fds.push_back(epfd);
}
@ -684,7 +684,7 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
GPR_TIMER_SCOPE("kick_one_worker", 0);
pollable* p = specific_worker->pollable_obj;
grpc_core::MutexLockForGprMu lock(&p->mu);
grpc_core::MutexLock lock(&p->mu);
GPR_ASSERT(specific_worker != nullptr);
if (specific_worker->kicked) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
@ -1296,7 +1296,7 @@ static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
return;
}
grpc_core::MutexLockForGprMu lock(&pollset->mu);
grpc_core::MutexLock lock(&pollset->mu);
grpc_error* error = pollset_add_fd_locked(pollset, fd);
// If we are in PO_MULTI mode, we should update the pollsets of the FD.

@ -61,7 +61,7 @@ using grpc_core::Json;
* means the detection is done via network test that is unreliable and the
* unreliable result should not be referred by successive calls. */
static int g_metadata_server_available = 0;
static grpc_core::Mutex* g_state_mu;
static gpr_mu g_state_mu;
/* Protect a metadata_server_detector instance that can be modified by more than
* one gRPC threads */
static gpr_mu* g_polling_mu;
@ -69,9 +69,7 @@ static gpr_once g_once = GPR_ONCE_INIT;
static grpc_core::internal::grpc_gce_tenancy_checker g_gce_tenancy_checker =
grpc_alts_is_running_on_gcp;
static void init_default_credentials(void) {
g_state_mu = new grpc_core::Mutex();
}
static void init_default_credentials(void) { gpr_mu_init(&g_state_mu); }
struct metadata_server_detector {
grpc_polling_entity pollent;
@ -284,7 +282,7 @@ end:
static void update_tenancy() {
gpr_once_init(&g_once, init_default_credentials);
grpc_core::MutexLock lock(g_state_mu);
grpc_core::MutexLock lock(&g_state_mu);
/* Try a platform-provided hint for GCE. */
if (!g_metadata_server_available) {
@ -299,7 +297,7 @@ static void update_tenancy() {
}
static bool metadata_server_available() {
grpc_core::MutexLock lock(g_state_mu);
grpc_core::MutexLock lock(&g_state_mu);
return static_cast<bool>(g_metadata_server_available);
}
@ -389,8 +387,9 @@ void set_gce_tenancy_checker_for_testing(grpc_gce_tenancy_checker checker) {
void grpc_flush_cached_google_default_credentials(void) {
grpc_core::ExecCtx exec_ctx;
gpr_once_init(&g_once, init_default_credentials);
grpc_core::MutexLock lock(g_state_mu);
gpr_mu_lock(&g_state_mu);
g_metadata_server_available = 0;
gpr_mu_unlock(&g_state_mu);
}
} // namespace internal

@ -84,7 +84,7 @@ class SecurityHandshaker : public Handshaker {
tsi_handshaker* handshaker_;
RefCountedPtr<grpc_security_connector> connector_;
Mutex mu_;
gpr_mu mu_;
bool is_shutdown_ = false;
// Endpoint and read buffer to destroy after a shutdown.
@ -120,12 +120,14 @@ SecurityHandshaker::SecurityHandshaker(tsi_handshaker* handshaker,
max_frame_size_ = grpc_channel_arg_get_integer(
arg, {0, 0, std::numeric_limits<int>::max()});
}
gpr_mu_init(&mu_);
grpc_slice_buffer_init(&outgoing_);
GRPC_CLOSURE_INIT(&on_peer_checked_, &SecurityHandshaker::OnPeerCheckedFn,
this, grpc_schedule_on_exec_ctx);
}
SecurityHandshaker::~SecurityHandshaker() {
gpr_mu_destroy(&mu_);
tsi_handshaker_destroy(handshaker_);
tsi_handshaker_result_destroy(handshaker_result_);
if (endpoint_to_destroy_ != nullptr) {

@ -69,7 +69,7 @@ static bool g_forced_hash_seed = false;
InternedSliceRefcount::~InternedSliceRefcount() {
slice_shard* shard = &g_shards[SHARD_IDX(this->hash)];
MutexLockForGprMu lock(&shard->mu);
MutexLock lock(&shard->mu);
InternedSliceRefcount** prev_next;
InternedSliceRefcount* cur;
for (prev_next = &shard->strs[TABLE_IDX(this->hash, shard->capacity)],

@ -94,9 +94,9 @@ struct CallRegistrationTable {
// The map key should be owned strings rather than unowned char*'s to
// guarantee that it outlives calls on the core channel (which may outlast the
// C++ or other wrapped language Channel that registered these calls).
std::map<std::pair<std::string, std::string>, RegisteredCall> map
ABSL_GUARDED_BY(mu);
int method_registration_attempts ABSL_GUARDED_BY(mu) = 0;
std::map<std::pair<std::string, std::string>, RegisteredCall>
map /* GUARDED_BY(mu) */;
int method_registration_attempts /* GUARDED_BY(mu) */ = 0;
};
} // namespace grpc_core

@ -63,15 +63,16 @@ extern void grpc_register_built_in_plugins(void);
#define MAX_PLUGINS 128
static gpr_once g_basic_init = GPR_ONCE_INIT;
static grpc_core::Mutex* g_init_mu;
static gpr_mu g_init_mu;
static int g_initializations;
static grpc_core::CondVar* g_shutting_down_cv;
static gpr_cv* g_shutting_down_cv;
static bool g_shutting_down;
static void do_basic_init(void) {
gpr_log_verbosity_init();
g_init_mu = new grpc_core::Mutex();
g_shutting_down_cv = new grpc_core::CondVar();
gpr_mu_init(&g_init_mu);
g_shutting_down_cv = static_cast<gpr_cv*>(malloc(sizeof(gpr_cv)));
gpr_cv_init(g_shutting_down_cv);
g_shutting_down = false;
grpc_register_built_in_plugins();
grpc_cq_global_init();
@ -129,11 +130,11 @@ void grpc_init(void) {
int i;
gpr_once_init(&g_basic_init, do_basic_init);
grpc_core::MutexLock lock(g_init_mu);
grpc_core::MutexLock lock(&g_init_mu);
if (++g_initializations == 1) {
if (g_shutting_down) {
g_shutting_down = false;
g_shutting_down_cv->SignalAll();
gpr_cv_broadcast(g_shutting_down_cv);
}
grpc_core::Fork::GlobalInit();
grpc_fork_handlers_auto_register();
@ -195,14 +196,14 @@ void grpc_shutdown_internal_locked(void) {
grpc_core::ExecCtx::GlobalShutdown();
grpc_core::ApplicationCallbackExecCtx::GlobalShutdown();
g_shutting_down = false;
g_shutting_down_cv->SignalAll();
gpr_cv_broadcast(g_shutting_down_cv);
// Absolute last action will be to delete static metadata context.
grpc_destroy_static_metadata_ctx();
}
void grpc_shutdown_internal(void* /*ignored*/) {
GRPC_API_TRACE("grpc_shutdown_internal", 0, ());
grpc_core::MutexLock lock(g_init_mu);
grpc_core::MutexLock lock(&g_init_mu);
// We have released lock from the shutdown thread and it is possible that
// another grpc_init has been called, and do nothing if that is the case.
if (--g_initializations != 0) {
@ -213,7 +214,7 @@ void grpc_shutdown_internal(void* /*ignored*/) {
void grpc_shutdown(void) {
GRPC_API_TRACE("grpc_shutdown(void)", 0, ());
grpc_core::MutexLock lock(g_init_mu);
grpc_core::MutexLock lock(&g_init_mu);
if (--g_initializations == 0) {
grpc_core::ApplicationCallbackExecCtx* acec =
@ -242,7 +243,7 @@ void grpc_shutdown(void) {
void grpc_shutdown_blocking(void) {
GRPC_API_TRACE("grpc_shutdown_blocking(void)", 0, ());
grpc_core::MutexLock lock(g_init_mu);
grpc_core::MutexLock lock(&g_init_mu);
if (--g_initializations == 0) {
g_shutting_down = true;
grpc_shutdown_internal_locked();
@ -252,15 +253,16 @@ void grpc_shutdown_blocking(void) {
int grpc_is_initialized(void) {
int r;
gpr_once_init(&g_basic_init, do_basic_init);
grpc_core::MutexLock lock(g_init_mu);
grpc_core::MutexLock lock(&g_init_mu);
r = g_initializations > 0;
return r;
}
void grpc_maybe_wait_for_async_shutdown(void) {
gpr_once_init(&g_basic_init, do_basic_init);
grpc_core::MutexLock lock(g_init_mu);
grpc_core::MutexLock lock(&g_init_mu);
while (g_shutting_down) {
g_shutting_down_cv->Wait(g_init_mu);
gpr_cv_wait(g_shutting_down_cv, &g_init_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
}

@ -794,7 +794,7 @@ void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
{
// Wait for startup to be finished. Locks mu_global.
MutexLock lock(&mu_global_);
WaitUntil(&starting_cv_, &mu_global_, [this] { return !starting_; });
starting_cv_.WaitUntil(&mu_global_, [this] { return !starting_; });
// Stay locked, and gather up some stuff to do.
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (shutdown_published_) {

@ -596,7 +596,7 @@ static void* set_user_data(UserData* ud, void (*destroy_func)(void*),
grpc_core::ReleasableMutexLock lock(&ud->mu_user_data);
if (ud->destroy_user_data.Load(grpc_core::MemoryOrder::RELAXED)) {
/* user data can only be set once */
lock.Release();
lock.Unlock();
if (destroy_func != nullptr) {
destroy_func(data);
}

@ -69,9 +69,9 @@ typedef struct alts_grpc_handshaker_client {
grpc_closure on_handshaker_service_resp_recv;
/* Buffers containing information to be sent (or received) to (or from) the
* handshaker service. */
grpc_byte_buffer* send_buffer = nullptr;
grpc_byte_buffer* recv_buffer = nullptr;
grpc_status_code status = GRPC_STATUS_OK;
grpc_byte_buffer* send_buffer;
grpc_byte_buffer* recv_buffer;
grpc_status_code status;
/* Initial metadata to be received from handshaker service. */
grpc_metadata_array recv_initial_metadata;
/* A callback function provided by an application to be invoked when response
@ -95,15 +95,15 @@ typedef struct alts_grpc_handshaker_client {
/** callback for receiving handshake call status */
grpc_closure on_status_received;
/** gRPC status code of handshake call */
grpc_status_code handshake_status_code = GRPC_STATUS_OK;
grpc_status_code handshake_status_code;
/** gRPC status details of handshake call */
grpc_slice handshake_status_details;
/* mu synchronizes all fields below including their internal fields. */
grpc_core::Mutex mu;
gpr_mu mu;
/* indicates if the handshaker call's RECV_STATUS_ON_CLIENT op is done. */
bool receive_status_finished = false;
bool receive_status_finished;
/* if non-null, contains arguments to complete a TSI next callback. */
recv_message_result* pending_recv_message_result = nullptr;
recv_message_result* pending_recv_message_result;
/* Maximum frame size used by frame protector. */
size_t max_frame_size;
} alts_grpc_handshaker_client;
@ -140,7 +140,8 @@ static void alts_grpc_handshaker_client_unref(
grpc_alts_credentials_options_destroy(client->options);
gpr_free(client->buffer);
grpc_slice_unref_internal(client->handshake_status_details);
delete client;
gpr_mu_destroy(&client->mu);
gpr_free(client);
}
}
@ -694,24 +695,24 @@ alts_handshaker_client* alts_grpc_handshaker_client_create(
gpr_log(GPR_ERROR, "Invalid arguments to alts_handshaker_client_create()");
return nullptr;
}
alts_grpc_handshaker_client* client = new alts_grpc_handshaker_client();
memset(&client->base, 0, sizeof(client->base));
client->base.vtable =
vtable_for_testing == nullptr ? &vtable : vtable_for_testing;
alts_grpc_handshaker_client* client =
static_cast<alts_grpc_handshaker_client*>(gpr_zalloc(sizeof(*client)));
gpr_mu_init(&client->mu);
gpr_ref_init(&client->refs, 1);
client->handshaker = handshaker;
client->grpc_caller = grpc_call_start_batch_and_execute;
grpc_metadata_array_init(&client->recv_initial_metadata);
client->handshaker = handshaker;
client->cb = cb;
client->user_data = user_data;
client->send_buffer = nullptr;
client->recv_buffer = nullptr;
client->options = grpc_alts_credentials_options_copy(options);
client->target_name = grpc_slice_copy(target_name);
client->is_client = is_client;
client->recv_bytes = grpc_empty_slice();
grpc_metadata_array_init(&client->recv_initial_metadata);
client->is_client = is_client;
client->max_frame_size = max_frame_size;
client->buffer_size = TSI_ALTS_INITIAL_BUFFER_SIZE;
client->buffer = static_cast<unsigned char*>(gpr_zalloc(client->buffer_size));
client->handshake_status_details = grpc_empty_slice();
client->max_frame_size = max_frame_size;
grpc_slice slice = grpc_slice_from_copied_string(handshaker_service_url);
client->call =
strcmp(handshaker_service_url, ALTS_HANDSHAKER_SERVICE_URL_FOR_TESTING) ==
@ -721,6 +722,8 @@ alts_handshaker_client* alts_grpc_handshaker_client_create(
channel, nullptr, GRPC_PROPAGATE_DEFAULTS, interested_parties,
grpc_slice_from_static_string(ALTS_SERVICE_METHOD), &slice,
GRPC_MILLIS_INF_FUTURE, nullptr);
client->base.vtable =
vtable_for_testing == nullptr ? &vtable : vtable_for_testing;
GRPC_CLOSURE_INIT(&client->on_handshaker_service_resp_recv, grpc_cb, client,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&client->on_status_received, on_status_received, client,

@ -48,23 +48,23 @@ struct alts_tsi_handshaker {
tsi_handshaker base;
grpc_slice target_name;
bool is_client;
bool has_sent_start_message = false;
bool has_created_handshaker_client = false;
bool has_sent_start_message;
bool has_created_handshaker_client;
char* handshaker_service_url;
grpc_pollset_set* interested_parties;
grpc_alts_credentials_options* options;
alts_handshaker_client_vtable* client_vtable_for_testing = nullptr;
grpc_channel* channel = nullptr;
alts_handshaker_client_vtable* client_vtable_for_testing;
grpc_channel* channel;
bool use_dedicated_cq;
// mu synchronizes all fields below. Note these are the
// only fields that can be concurrently accessed (due to
// potential concurrency of tsi_handshaker_shutdown and
// tsi_handshaker_next).
grpc_core::Mutex mu;
alts_handshaker_client* client = nullptr;
gpr_mu mu;
alts_handshaker_client* client;
// shutdown effectively follows base.handshake_shutdown,
// but is synchronized by the mutex of this object.
bool shutdown = false;
bool shutdown;
// Maximum frame size used by frame protector.
size_t max_frame_size;
};
@ -592,7 +592,8 @@ static void handshaker_destroy(tsi_handshaker* self) {
grpc_channel_destroy_internal(handshaker->channel);
}
gpr_free(handshaker->handshaker_service_url);
delete handshaker;
gpr_mu_destroy(&handshaker->mu);
gpr_free(handshaker);
}
static const tsi_handshaker_vtable handshaker_vtable = {
@ -627,22 +628,26 @@ tsi_result alts_tsi_handshaker_create(
gpr_log(GPR_ERROR, "Invalid arguments to alts_tsi_handshaker_create()");
return TSI_INVALID_ARGUMENT;
}
bool use_dedicated_cq = interested_parties == nullptr;
alts_tsi_handshaker* handshaker = new alts_tsi_handshaker();
memset(&handshaker->base, 0, sizeof(handshaker->base));
handshaker->base.vtable =
use_dedicated_cq ? &handshaker_vtable_dedicated : &handshaker_vtable;
alts_tsi_handshaker* handshaker =
static_cast<alts_tsi_handshaker*>(gpr_zalloc(sizeof(*handshaker)));
gpr_mu_init(&handshaker->mu);
handshaker->use_dedicated_cq = interested_parties == nullptr;
handshaker->client = nullptr;
handshaker->is_client = is_client;
handshaker->has_sent_start_message = false;
handshaker->target_name = target_name == nullptr
? grpc_empty_slice()
: grpc_slice_from_static_string(target_name);
handshaker->is_client = is_client;
handshaker->handshaker_service_url = gpr_strdup(handshaker_service_url);
handshaker->interested_parties = interested_parties;
handshaker->has_created_handshaker_client = false;
handshaker->handshaker_service_url = gpr_strdup(handshaker_service_url);
handshaker->options = grpc_alts_credentials_options_copy(options);
handshaker->use_dedicated_cq = use_dedicated_cq;
handshaker->max_frame_size = user_specified_max_frame_size != 0
? user_specified_max_frame_size
: kTsiAltsMaxFrameSize;
handshaker->base.vtable = handshaker->use_dedicated_cq
? &handshaker_vtable_dedicated
: &handshaker_vtable;
*self = &handshaker->base;
return TSI_OK;
}

@ -84,6 +84,7 @@ class SslSessionLRUCache::Node {
SslSessionLRUCache::SslSessionLRUCache(size_t capacity) : capacity_(capacity) {
GPR_ASSERT(capacity > 0);
gpr_mu_init(&lock_);
entry_by_key_ = grpc_avl_create(&cache_avl_vtable);
}
@ -95,6 +96,7 @@ SslSessionLRUCache::~SslSessionLRUCache() {
node = next;
}
grpc_avl_unref(entry_by_key_, nullptr);
gpr_mu_destroy(&lock_);
}
size_t SslSessionLRUCache::Size() {

@ -31,7 +31,6 @@ extern "C" {
#include "src/core/lib/avl/avl.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/tsi/ssl/session_cache/ssl_session.h"
/// Cache for SSL sessions for sessions resumption.
@ -77,7 +76,7 @@ class SslSessionLRUCache : public grpc_core::RefCounted<SslSessionLRUCache> {
void PushFront(Node* node);
void AssertInvariants();
grpc_core::Mutex lock_;
gpr_mu lock_;
size_t capacity_;
Node* use_order_list_head_ = nullptr;

@ -36,17 +36,17 @@ namespace {
internal::GrpcLibraryInitializer g_gli_initializer;
gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT;
grpc_core::Mutex* g_callback_alternative_mu;
grpc_core::ManualConstructor<grpc_core::Mutex> g_callback_alternative_mu;
// Implement a ref-counted callback CQ for global use in the alternative
// implementation so that its threads are only created once. Do this using
// explicit ref-counts and raw pointers rather than a shared-ptr since that
// has a non-trivial destructor and thus can't be used for global variables.
struct CallbackAlternativeCQ {
int refs ABSL_GUARDED_BY(g_callback_alternative_mu) = 0;
CompletionQueue* cq ABSL_GUARDED_BY(g_callback_alternative_mu);
std::vector<grpc_core::Thread>* nexting_threads
ABSL_GUARDED_BY(g_callback_alternative_mu);
int refs = 0; // GUARDED_BY(g_callback_alternative_mu);
CompletionQueue* cq; // GUARDED_BY(g_callback_alternative_mu);
std::vector<grpc_core::Thread>*
nexting_threads; // GUARDED_BY(g_callback_alternative_mu);
CompletionQueue* Ref() {
grpc_core::MutexLock lock(&*g_callback_alternative_mu);
@ -104,7 +104,7 @@ struct CallbackAlternativeCQ {
}
void Unref() {
grpc_core::MutexLock lock(g_callback_alternative_mu);
grpc_core::MutexLock lock(&*g_callback_alternative_mu);
refs--;
if (refs == 0) {
cq->Shutdown();
@ -191,15 +191,12 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
CompletionQueue* CompletionQueue::CallbackAlternativeCQ() {
gpr_once_init(&g_once_init_callback_alternative,
[] { g_callback_alternative_mu = new grpc_core::Mutex(); });
[] { g_callback_alternative_mu.Init(); });
return g_callback_alternative_cq.Ref();
}
void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq)
ABSL_NO_THREAD_SAFETY_ANALYSIS {
void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq) {
(void)cq;
// This accesses g_callback_alternative_cq without acquiring the mutex
// but it's considered safe because it just reads the pointer address.
GPR_DEBUG_ASSERT(cq == g_callback_alternative_cq.cq);
g_callback_alternative_cq.Unref();
}

@ -68,7 +68,7 @@ void DynamicThreadPool::ThreadFunc() {
if (!callbacks_.empty()) {
auto cb = callbacks_.front();
callbacks_.pop();
lock.Release();
lock.Unlock();
cb();
} else if (shutdown_) {
break;
@ -97,7 +97,7 @@ void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
DynamicThreadPool::~DynamicThreadPool() {
grpc_core::MutexLock lock(&mu_);
shutdown_ = true;
cv_.SignalAll();
cv_.Broadcast();
while (nthreads_ != 0) {
shutdown_cv_.Wait(&mu_);
}

@ -279,7 +279,7 @@ LoadReporter::GenerateLoadBalancingFeedback() {
double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
std::chrono::duration<double> duration_seconds =
newest->end_time - oldest->end_time;
lock.Release();
lock.Unlock();
::grpc::lb::v1::LoadBalancingFeedback feedback;
feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
feedback.set_calls_per_second(

@ -171,7 +171,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
{
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.Release();
lock.Unlock();
Shutdown(std::move(self), "OnRequestDelivered");
return;
}
@ -229,7 +229,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
{
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.Release();
lock.Unlock();
Shutdown(std::move(self), "OnReadDone");
return;
}
@ -261,7 +261,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
{
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.Release();
lock.Unlock();
Shutdown(std::move(self), "ScheduleNextReport");
return;
}
@ -301,7 +301,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
{
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.Release();
lock.Unlock();
Shutdown(std::move(self), "SendReport");
return;
}

@ -152,7 +152,7 @@ void ThreadManager::MainWorkLoop() {
bool ok;
WorkStatus work_status = PollForWork(&tag, &ok);
grpc_core::LockableAndReleasableMutexLock lock(&mu_);
grpc_core::ReleasableMutexLock lock(&mu_);
// Reduce the number of pollers by 1 and check what happened with the poll
num_pollers_--;
bool done = false;
@ -179,7 +179,7 @@ void ThreadManager::MainWorkLoop() {
max_active_threads_sofar_ = num_threads_;
}
// Drop lock before spawning thread to avoid contention
lock.Release();
lock.Unlock();
WorkerThread* worker = new WorkerThread(this);
if (worker->created()) {
worker->Start();
@ -195,17 +195,17 @@ void ThreadManager::MainWorkLoop() {
// There is still at least some thread polling, so we can go on
// even though we are below the number of pollers that we would
// like to have (min_pollers_)
lock.Release();
lock.Unlock();
} else {
// There are no pollers to spare and we couldn't allocate
// a new thread, so resources are exhausted!
lock.Release();
lock.Unlock();
resource_exhausted = true;
}
} else {
// There are a sufficient number of pollers available so we can do
// the work and continue polling with our existing poller threads
lock.Release();
lock.Unlock();
}
// Lock is always released at this point - do the application work
// or return resource exhausted if there is new work but we couldn't

@ -86,7 +86,7 @@ class ServerInfo {
void Await() {
grpc_core::MutexLock lock(&mu_);
grpc_core::WaitUntil(&cv_, &mu_, [this] { return ready_; });
cv_.WaitUntil(&mu_, [this] { return ready_; });
}
private:

@ -97,7 +97,7 @@ static void actually_poll(void* argsp) {
while (true) {
grpc_core::ExecCtx exec_ctx;
{
grpc_core::MutexLockForGprMu lock(args->mu);
grpc_core::MutexLock lock(args->mu);
if (args->done) {
break;
}
@ -123,7 +123,7 @@ static void must_succeed(void* argsp, grpc_error* err) {
GPR_ASSERT(err == GRPC_ERROR_NONE);
GPR_ASSERT(args->addrs != nullptr);
GPR_ASSERT(args->addrs->naddrs > 0);
grpc_core::MutexLockForGprMu lock(args->mu);
grpc_core::MutexLock lock(args->mu);
args->done = true;
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
}
@ -131,7 +131,7 @@ static void must_succeed(void* argsp, grpc_error* err) {
static void must_fail(void* argsp, grpc_error* err) {
args_struct* args = static_cast<args_struct*>(argsp);
GPR_ASSERT(err != GRPC_ERROR_NONE);
grpc_core::MutexLockForGprMu lock(args->mu);
grpc_core::MutexLock lock(args->mu);
args->done = true;
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
}

@ -89,7 +89,7 @@ static void poll_pollset_until_request_done(args_struct* args) {
while (true) {
grpc_core::ExecCtx exec_ctx;
{
grpc_core::MutexLockForGprMu lock(args->mu);
grpc_core::MutexLock lock(args->mu);
if (args->done) {
break;
}
@ -110,7 +110,7 @@ static void must_succeed(void* argsp, grpc_error* err) {
GPR_ASSERT(err == GRPC_ERROR_NONE);
GPR_ASSERT(args->addrs != nullptr);
GPR_ASSERT(args->addrs->naddrs > 0);
grpc_core::MutexLockForGprMu lock(args->mu);
grpc_core::MutexLock lock(args->mu);
args->done = true;
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
}
@ -118,7 +118,7 @@ static void must_succeed(void* argsp, grpc_error* err) {
static void must_fail(void* argsp, grpc_error* err) {
args_struct* args = static_cast<args_struct*>(argsp);
GPR_ASSERT(err != GRPC_ERROR_NONE);
grpc_core::MutexLockForGprMu lock(args->mu);
grpc_core::MutexLock lock(args->mu);
args->done = true;
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
}
@ -132,7 +132,7 @@ static void must_succeed_with_ipv6_first(void* argsp, grpc_error* err) {
const struct sockaddr* first_address =
reinterpret_cast<const struct sockaddr*>(args->addrs->addrs[0].addr);
GPR_ASSERT(first_address->sa_family == AF_INET6);
grpc_core::MutexLockForGprMu lock(args->mu);
grpc_core::MutexLock lock(args->mu);
args->done = true;
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
}
@ -145,7 +145,7 @@ static void must_succeed_with_ipv4_first(void* argsp, grpc_error* err) {
const struct sockaddr* first_address =
reinterpret_cast<const struct sockaddr*>(args->addrs->addrs[0].addr);
GPR_ASSERT(first_address->sa_family == AF_INET);
grpc_core::MutexLockForGprMu lock(args->mu);
grpc_core::MutexLock lock(args->mu);
args->done = true;
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
}

@ -394,7 +394,7 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) {
for (int i = 1; i <= kNumMessagePingPongsPerCall; i++) {
{
grpc_core::MutexLock lock(&ping_pong_round_mu);
ping_pong_round_cv.SignalAll();
ping_pong_round_cv.Broadcast();
while (int(ping_pong_round) != i) {
ping_pong_round_cv.Wait(&ping_pong_round_mu);
}
@ -404,7 +404,7 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) {
{
grpc_core::MutexLock lock(&ping_pong_round_mu);
ping_pongs_done++;
ping_pong_round_cv.SignalAll();
ping_pong_round_cv.Broadcast();
}
}
gpr_log(GPR_DEBUG, "now receive status on call with server address:%s",
@ -425,7 +425,7 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) {
ping_pong_round_cv.Wait(&ping_pong_round_mu);
}
ping_pong_round++;
ping_pong_round_cv.SignalAll();
ping_pong_round_cv.Broadcast();
gpr_log(GPR_DEBUG, "initiate ping pong round: %ld", ping_pong_round);
}
}

@ -34,8 +34,6 @@
#include "src/core/lib/iomgr/timer_manager.h"
#include "test/core/util/test_config.h"
#include "absl/time/time.h"
extern char** environ;
#ifdef GPR_ANDROID
@ -118,7 +116,8 @@ TEST_P(TimeJumpTest, TimedWait) {
run_cmd(cmd.str().c_str());
});
gpr_timespec before = gpr_now(GPR_CLOCK_MONOTONIC);
bool timedout = cond.WaitWithTimeout(&mu, absl::Milliseconds(kWaitTimeMs));
int timedout = cond.Wait(
&mu, grpc_millis_to_timespec(kWaitTimeMs, GPR_CLOCK_REALTIME));
gpr_timespec after = gpr_now(GPR_CLOCK_MONOTONIC);
int32_t elapsed_ms = gpr_time_to_millis(gpr_time_sub(after, before));
gpr_log(GPR_DEBUG, "After wait, timedout = %d elapsed_ms = %d", timedout,

@ -62,7 +62,6 @@
#include "src/core/lib/gpr/tmpfile.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time_util.h"
#include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/iomgr/parse_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
@ -642,7 +641,7 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
void NotifyDoneWithAdsCallLocked() {
if (!ads_done_) {
ads_done_ = true;
ads_cond_.SignalAll();
ads_cond_.Broadcast();
}
}
@ -795,10 +794,9 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10);
{
grpc_core::MutexLock lock(&parent_->ads_mu_);
if (!grpc_core::WaitUntilWithDeadline(
&parent_->ads_cond_, &parent_->ads_mu_,
[this] { return parent_->ads_done_; },
grpc_core::ToAbslTime(deadline))) {
if (!parent_->ads_cond_.WaitUntil(
&parent_->ads_mu_, [this] { return parent_->ads_done_; },
deadline)) {
break;
}
}
@ -1210,8 +1208,8 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
grpc_core::CondVar cv;
if (result_queue_.empty()) {
load_report_cond_ = &cv;
grpc_core::WaitUntil(load_report_cond_, &load_report_mu_,
[this] { return !result_queue_.empty(); });
load_report_cond_->WaitUntil(&load_report_mu_,
[this] { return !result_queue_.empty(); });
load_report_cond_ = nullptr;
}
std::vector<ClientStats> result = std::move(result_queue_.front());
@ -1278,8 +1276,8 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
}
// Wait until notified done.
grpc_core::MutexLock lock(&parent_->lrs_mu_);
grpc_core::WaitUntil(&parent_->lrs_cv_, &parent_->lrs_mu_,
[this] { return parent_->lrs_done_; });
parent_->lrs_cv_.WaitUntil(&parent_->lrs_mu_,
[this] { return parent_->lrs_done_; });
}
gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this);
return Status::OK;
@ -1292,7 +1290,7 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
void NotifyDoneWithLrsCallLocked() {
if (!lrs_done_) {
lrs_done_ = true;
lrs_cv_.SignalAll();
lrs_cv_.Broadcast();
}
}

Loading…
Cancel
Save