[infra] Fix absl::Mutex check and remove all uses (#33144)

`tools/run_tests/sanity/check_absl_mutex.sh` was broken, a missing paren
crashed the script if run locally. It's unclear yet how our sanity
checks were not complaining about this, `run_tests.py` does not save the
log.
revert-32956-client-channel-resolver-fuzzer
AJ Heller 2 years ago committed by GitHub
parent 239d3e6857
commit 252ebad341
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 2
      src/core/BUILD
  3. 8
      src/core/ext/filters/client_channel/channel_connectivity.cc
  4. 6
      src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
  5. 4
      src/core/lib/event_engine/posix_engine/posix_engine_listener.h
  6. 14
      src/core/lib/resource_quota/memory_quota.cc
  7. 3
      src/core/lib/resource_quota/memory_quota.h
  8. 10
      test/cpp/ext/filters/census/library.h
  9. 26
      test/cpp/interop/backend_metrics_lb_policy.cc
  10. 5
      test/cpp/interop/backend_metrics_lb_policy.h
  11. 5
      test/cpp/interop/interop_server.cc
  12. 4
      tools/run_tests/sanity/check_absl_mutex.sh

@ -2971,7 +2971,6 @@ grpc_cc_library(
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",
"absl/strings:cord", "absl/strings:cord",
"absl/synchronization",
"absl/types:optional", "absl/types:optional",
"absl/types:variant", "absl/types:variant",
"upb_collections_lib", "upb_collections_lib",

@ -1066,7 +1066,6 @@ grpc_cc_library(
"absl/container:flat_hash_set", "absl/container:flat_hash_set",
"absl/status", "absl/status",
"absl/strings", "absl/strings",
"absl/synchronization",
"absl/types:optional", "absl/types:optional",
], ],
deps = [ deps = [
@ -1921,7 +1920,6 @@ grpc_cc_library(
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",
"absl/synchronization",
"absl/types:optional", "absl/types:optional",
], ],
deps = [ deps = [

@ -20,7 +20,6 @@
#include "absl/base/thread_annotations.h" #include "absl/base/thread_annotations.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/synchronization/mutex.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
@ -36,6 +35,7 @@
#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
@ -169,7 +169,7 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
void StartTimer(Timestamp deadline) { void StartTimer(Timestamp deadline) {
const Duration timeout = deadline - Timestamp::Now(); const Duration timeout = deadline - Timestamp::Now();
absl::MutexLock lock(&mu_); MutexLock lock(&mu_);
timer_handle_ = channel_->channel_stack()->EventEngine()->RunAfter( timer_handle_ = channel_->channel_stack()->EventEngine()->RunAfter(
timeout, [self = Ref()]() mutable { timeout, [self = Ref()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx; ApplicationCallbackExecCtx callback_exec_ctx;
@ -186,7 +186,7 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
GRPC_LOG_IF_ERROR("watch_completion_error", error); GRPC_LOG_IF_ERROR("watch_completion_error", error);
} }
{ {
absl::MutexLock lock(&self->mu_); MutexLock lock(&self->mu_);
if (self->timer_handle_.has_value()) { if (self->timer_handle_.has_value()) {
self->channel_->channel_stack()->EventEngine()->Cancel( self->channel_->channel_stack()->EventEngine()->Cancel(
*self->timer_handle_); *self->timer_handle_);
@ -238,7 +238,7 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
// timer_handle_ might be accessed in parallel from multiple threads, e.g. // timer_handle_ might be accessed in parallel from multiple threads, e.g.
// timer callback fired immediately on an EventEngine thread before // timer callback fired immediately on an EventEngine thread before
// RunAfter() returns. // RunAfter() returns.
absl::Mutex mu_; Mutex mu_;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle> absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
timer_handle_ ABSL_GUARDED_BY(mu_); timer_handle_ ABSL_GUARDED_BY(mu_);
bool timer_fired_ = false; bool timer_fired_ = false;

@ -64,7 +64,7 @@ PosixEngineListenerImpl::PosixEngineListenerImpl(
absl::StatusOr<int> PosixEngineListenerImpl::Bind( absl::StatusOr<int> PosixEngineListenerImpl::Bind(
const EventEngine::ResolvedAddress& addr, const EventEngine::ResolvedAddress& addr,
PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd) { PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd) {
absl::MutexLock lock(&this->mu_); grpc_core::MutexLock lock(&this->mu_);
if (this->started_) { if (this->started_) {
return absl::FailedPreconditionError( return absl::FailedPreconditionError(
"Listener is already started, ports can no longer be bound"); "Listener is already started, ports can no longer be bound");
@ -254,7 +254,7 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::Shutdown() {
} }
absl::Status PosixEngineListenerImpl::Start() { absl::Status PosixEngineListenerImpl::Start() {
absl::MutexLock lock(&this->mu_); grpc_core::MutexLock lock(&this->mu_);
// Start each asynchronous acceptor. // Start each asynchronous acceptor.
GPR_ASSERT(!this->started_); GPR_ASSERT(!this->started_);
this->started_ = true; this->started_ = true;
@ -267,7 +267,7 @@ absl::Status PosixEngineListenerImpl::Start() {
void PosixEngineListenerImpl::TriggerShutdown() { void PosixEngineListenerImpl::TriggerShutdown() {
// This would get invoked from the destructor of the parent // This would get invoked from the destructor of the parent
// PosixEngineListener object. // PosixEngineListener object.
absl::MutexLock lock(&this->mu_); grpc_core::MutexLock lock(&this->mu_);
for (auto it = acceptors_.begin(); it != acceptors_.end(); it++) { for (auto it = acceptors_.begin(); it != acceptors_.end(); it++) {
// Trigger shutdown of each asynchronous acceptor. This in-turn calls // Trigger shutdown of each asynchronous acceptor. This in-turn calls
// ShutdownHandle on the associated poller event handle. It may also // ShutdownHandle on the associated poller event handle. It may also

@ -28,7 +28,6 @@
#include "absl/functional/any_invocable.h" #include "absl/functional/any_invocable.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include "absl/synchronization/mutex.h"
#include <grpc/event_engine/endpoint_config.h> #include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
@ -36,6 +35,7 @@
#include <grpc/event_engine/slice_buffer.h> #include <grpc/event_engine/slice_buffer.h>
#include "src/core/lib/event_engine/posix.h" #include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/port.h" #include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP #ifdef GRPC_POSIX_SOCKET_TCP
@ -171,7 +171,7 @@ class PosixEngineListenerImpl
friend class AsyncConnectionAcceptor; friend class AsyncConnectionAcceptor;
// The mutex ensures thread safety when multiple threads try to call Bind // The mutex ensures thread safety when multiple threads try to call Bind
// and Start in parallel. // and Start in parallel.
absl::Mutex mu_; grpc_core::Mutex mu_;
PosixEventPoller* poller_; PosixEventPoller* poller_;
PosixTcpOptions options_; PosixTcpOptions options_;
std::shared_ptr<EventEngine> engine_; std::shared_ptr<EventEngine> engine_;

@ -453,7 +453,7 @@ void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) {
AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator); AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator);
{ {
absl::MutexLock l(&shard.shard_mu); MutexLock l(&shard.shard_mu);
shard.allocators.emplace(allocator); shard.allocators.emplace(allocator);
} }
} }
@ -467,7 +467,7 @@ void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) {
small_allocators_.SelectShard(allocator); small_allocators_.SelectShard(allocator);
{ {
absl::MutexLock l(&small_shard.shard_mu); MutexLock l(&small_shard.shard_mu);
if (small_shard.allocators.erase(allocator) == 1) { if (small_shard.allocators.erase(allocator) == 1) {
return; return;
} }
@ -476,7 +476,7 @@ void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) {
AllocatorBucket::Shard& big_shard = big_allocators_.SelectShard(allocator); AllocatorBucket::Shard& big_shard = big_allocators_.SelectShard(allocator);
{ {
absl::MutexLock l(&big_shard.shard_mu); MutexLock l(&big_shard.shard_mu);
big_shard.allocators.erase(allocator); big_shard.allocators.erase(allocator);
} }
} }
@ -513,14 +513,14 @@ void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall(
AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator); AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator);
{ {
absl::MutexLock l(&old_shard.shard_mu); MutexLock l(&old_shard.shard_mu);
if (old_shard.allocators.erase(allocator) == 0) return; if (old_shard.allocators.erase(allocator) == 0) return;
} }
AllocatorBucket::Shard& new_shard = small_allocators_.SelectShard(allocator); AllocatorBucket::Shard& new_shard = small_allocators_.SelectShard(allocator);
{ {
absl::MutexLock l(&new_shard.shard_mu); MutexLock l(&new_shard.shard_mu);
new_shard.allocators.emplace(allocator); new_shard.allocators.emplace(allocator);
} }
} }
@ -534,14 +534,14 @@ void BasicMemoryQuota::MaybeMoveAllocatorSmallToBig(
AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator); AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator);
{ {
absl::MutexLock l(&old_shard.shard_mu); MutexLock l(&old_shard.shard_mu);
if (old_shard.allocators.erase(allocator) == 0) return; if (old_shard.allocators.erase(allocator) == 0) return;
} }
AllocatorBucket::Shard& new_shard = big_allocators_.SelectShard(allocator); AllocatorBucket::Shard& new_shard = big_allocators_.SelectShard(allocator);
{ {
absl::MutexLock l(&new_shard.shard_mu); MutexLock l(&new_shard.shard_mu);
new_shard.allocators.emplace(allocator); new_shard.allocators.emplace(allocator);
} }
} }

@ -30,7 +30,6 @@
#include "absl/base/thread_annotations.h" #include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_set.h" #include "absl/container/flat_hash_set.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include <grpc/event_engine/memory_allocator.h> #include <grpc/event_engine/memory_allocator.h>
@ -340,7 +339,7 @@ class BasicMemoryQuota final
struct Shard { struct Shard {
absl::flat_hash_set<GrpcMemoryAllocatorImpl*> allocators absl::flat_hash_set<GrpcMemoryAllocatorImpl*> allocators
ABSL_GUARDED_BY(shard_mu); ABSL_GUARDED_BY(shard_mu);
absl::Mutex shard_mu; Mutex shard_mu;
}; };
Shard& SelectShard(void* key) { Shard& SelectShard(void* key) {

@ -97,7 +97,7 @@ class ExportedTracesRecorder
ExportedTracesRecorder() : is_recording_(false) {} ExportedTracesRecorder() : is_recording_(false) {}
void Export(const std::vector<::opencensus::trace::exporter::SpanData>& spans) void Export(const std::vector<::opencensus::trace::exporter::SpanData>& spans)
override { override {
absl::MutexLock lock(&mutex_); grpc_core::MutexLock lock(&mutex_);
if (is_recording_) { if (is_recording_) {
for (auto const& span : spans) { for (auto const& span : spans) {
recorded_spans_.push_back(span); recorded_spans_.push_back(span);
@ -106,26 +106,26 @@ class ExportedTracesRecorder
} }
void StartRecording() { void StartRecording() {
absl::MutexLock lock(&mutex_); grpc_core::MutexLock lock(&mutex_);
ASSERT_FALSE(is_recording_); ASSERT_FALSE(is_recording_);
is_recording_ = true; is_recording_ = true;
} }
void StopRecording() { void StopRecording() {
absl::MutexLock lock(&mutex_); grpc_core::MutexLock lock(&mutex_);
ASSERT_TRUE(is_recording_); ASSERT_TRUE(is_recording_);
is_recording_ = false; is_recording_ = false;
} }
std::vector<::opencensus::trace::exporter::SpanData> GetAndClearSpans() { std::vector<::opencensus::trace::exporter::SpanData> GetAndClearSpans() {
absl::MutexLock lock(&mutex_); grpc_core::MutexLock lock(&mutex_);
return std::move(recorded_spans_); return std::move(recorded_spans_);
} }
private: private:
// This mutex is necessary as the SpanExporter runs a loop on a separate // This mutex is necessary as the SpanExporter runs a loop on a separate
// thread which periodically exports spans. // thread which periodically exports spans.
absl::Mutex mutex_; grpc_core::Mutex mutex_;
bool is_recording_ ABSL_GUARDED_BY(mutex_); bool is_recording_ ABSL_GUARDED_BY(mutex_);
std::vector<::opencensus::trace::exporter::SpanData> recorded_spans_ std::vector<::opencensus::trace::exporter::SpanData> recorded_spans_
ABSL_GUARDED_BY(mutex_); ABSL_GUARDED_BY(mutex_);

@ -236,21 +236,22 @@ void RegisterBackendMetricsLbPolicy(CoreConfiguration::Builder* builder) {
void LoadReportTracker::RecordPerRpcLoadReport( void LoadReportTracker::RecordPerRpcLoadReport(
const grpc_core::BackendMetricData* backend_metric_data) { const grpc_core::BackendMetricData* backend_metric_data) {
absl::MutexLock lock(&load_reports_mu_); grpc_core::MutexLock lock(&load_reports_mu_);
per_rpc_load_reports_.emplace_back( per_rpc_load_reports_.emplace_back(
BackendMetricDataToOrcaLoadReport(backend_metric_data)); BackendMetricDataToOrcaLoadReport(backend_metric_data));
} }
void LoadReportTracker::RecordOobLoadReport( void LoadReportTracker::RecordOobLoadReport(
const grpc_core::BackendMetricData& oob_metric_data) { const grpc_core::BackendMetricData& oob_metric_data) {
absl::MutexLock lock(&load_reports_mu_); grpc_core::MutexLock lock(&load_reports_mu_);
oob_load_reports_.emplace_back( oob_load_reports_.emplace_back(
*BackendMetricDataToOrcaLoadReport(&oob_metric_data)); *BackendMetricDataToOrcaLoadReport(&oob_metric_data));
load_reports_cv_.Signal();
} }
absl::optional<LoadReportTracker::LoadReportEntry> absl::optional<LoadReportTracker::LoadReportEntry>
LoadReportTracker::GetNextLoadReport() { LoadReportTracker::GetNextLoadReport() {
absl::MutexLock lock(&load_reports_mu_); grpc_core::MutexLock lock(&load_reports_mu_);
if (per_rpc_load_reports_.empty()) { if (per_rpc_load_reports_.empty()) {
return absl::nullopt; return absl::nullopt;
} }
@ -262,16 +263,17 @@ LoadReportTracker::GetNextLoadReport() {
LoadReportTracker::LoadReportEntry LoadReportTracker::WaitForOobLoadReport( LoadReportTracker::LoadReportEntry LoadReportTracker::WaitForOobLoadReport(
const std::function<bool(const TestOrcaReport&)>& predicate, const std::function<bool(const TestOrcaReport&)>& predicate,
absl::Duration poll_timeout, size_t max_attempts) { absl::Duration poll_timeout, size_t max_attempts) {
absl::MutexLock lock(&load_reports_mu_); grpc_core::MutexLock lock(&load_reports_mu_);
// This condition will be called under lock // This condition will be called under lock
auto condition = [&]() ABSL_NO_THREAD_SAFETY_ANALYSIS {
return !oob_load_reports_.empty();
};
for (size_t i = 0; i < max_attempts; i++) { for (size_t i = 0; i < max_attempts; i++) {
if (!load_reports_mu_.AwaitWithTimeout(absl::Condition(&condition), auto deadline = absl::Now() + poll_timeout;
poll_timeout)) { // loop to handle spurious wakeups.
return absl::nullopt; do {
} if (absl::Now() >= deadline) {
return absl::nullopt;
}
load_reports_cv_.WaitWithDeadline(&load_reports_mu_, deadline);
} while (oob_load_reports_.empty());
auto report = std::move(oob_load_reports_.front()); auto report = std::move(oob_load_reports_.front());
oob_load_reports_.pop_front(); oob_load_reports_.pop_front();
if (predicate(report)) { if (predicate(report)) {
@ -283,7 +285,7 @@ LoadReportTracker::LoadReportEntry LoadReportTracker::WaitForOobLoadReport(
} }
void LoadReportTracker::ResetCollectedLoadReports() { void LoadReportTracker::ResetCollectedLoadReports() {
absl::MutexLock lock(&load_reports_mu_); grpc_core::MutexLock lock(&load_reports_mu_);
per_rpc_load_reports_.clear(); per_rpc_load_reports_.clear();
oob_load_reports_.clear(); oob_load_reports_.clear();
} }

@ -49,7 +49,8 @@ class LoadReportTracker {
ABSL_GUARDED_BY(load_reports_mu_); ABSL_GUARDED_BY(load_reports_mu_);
std::deque<TestOrcaReport> oob_load_reports_ std::deque<TestOrcaReport> oob_load_reports_
ABSL_GUARDED_BY(load_reports_mu_); ABSL_GUARDED_BY(load_reports_mu_);
absl::Mutex load_reports_mu_; grpc_core::Mutex load_reports_mu_;
grpc_core::CondVar load_reports_cv_ ABSL_GUARDED_BY(load_reports_mu_);
}; };
void RegisterBackendMetricsLbPolicy( void RegisterBackendMetricsLbPolicy(
@ -57,4 +58,4 @@ void RegisterBackendMetricsLbPolicy(
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc
#endif // GRPC_TEST_CPP_INTEROP_BACKEND_METRICS_LB_POLICY_H #endif // GRPC_TEST_CPP_INTEROP_BACKEND_METRICS_LB_POLICY_H

@ -36,6 +36,7 @@
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/proto/grpc/testing/empty.pb.h" #include "src/proto/grpc/testing/empty.pb.h"
#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/messages.pb.h"
#include "src/proto/grpc/testing/test.grpc.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h"
@ -367,7 +368,7 @@ class TestServiceImpl : public TestService::Service {
server_metric_recorder_->SetMemoryUtilization( server_metric_recorder_->SetMemoryUtilization(
request_metrics.memory_utilization()); request_metrics.memory_utilization());
} }
absl::MutexLock lock(&retained_utilization_names_mu_); grpc_core::MutexLock lock(&retained_utilization_names_mu_);
std::map<grpc::string_ref, double> named_utilizations; std::map<grpc::string_ref, double> named_utilizations;
for (const auto& p : request_metrics.utilization()) { for (const auto& p : request_metrics.utilization()) {
const auto& key = *retained_utilization_names_.insert(p.first).first; const auto& key = *retained_utilization_names_.insert(p.first).first;
@ -379,7 +380,7 @@ class TestServiceImpl : public TestService::Service {
grpc::experimental::ServerMetricRecorder* server_metric_recorder_; grpc::experimental::ServerMetricRecorder* server_metric_recorder_;
std::set<std::string> retained_utilization_names_ std::set<std::string> retained_utilization_names_
ABSL_GUARDED_BY(retained_utilization_names_mu_); ABSL_GUARDED_BY(retained_utilization_names_mu_);
absl::Mutex retained_utilization_names_mu_; grpc_core::Mutex retained_utilization_names_mu_;
}; };
void grpc::testing::interop::RunServer( void grpc::testing::interop::RunServer(

@ -26,8 +26,8 @@ cd "$(dirname "$0")/../../.."
find . \( \( -name "*.cc" \) -or \( -name "*.h" \) \) \ find . \( \( -name "*.cc" \) -or \( -name "*.h" \) \) \
-a \( \( -wholename "./src/*" \) \ -a \( \( -wholename "./src/*" \) \
-or \( -wholename "./include/*" \) \ -or \( -wholename "./include/*" \) \
-or \( -wholename "./test/*" \) \ -or \( -wholename "./test/*" \) \) \
-a -not -wholename "./include/grpcpp/impl/codegen/sync.h" \ -a -not -wholename "./include/grpcpp/impl/sync.h" \
-a -not -wholename "./src/core/lib/gprpp/sync.h" \ -a -not -wholename "./src/core/lib/gprpp/sync.h" \
-a -not -wholename "./src/core/lib/gpr/sync_abseil.cc" \ -a -not -wholename "./src/core/lib/gpr/sync_abseil.cc" \
-print0 |\ -print0 |\

Loading…
Cancel
Save