Prevent direct usage of absl::Mutex (#29424)

* Add script to prevent absl::Mutex occurrences

* sanity and fixes

* shellcheck
pull/28928/head^2
Yash Tibrewal 3 years ago committed by GitHub
parent 59f7149159
commit f28695351e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  2. 8
      src/core/lib/iomgr/event_engine/promise.h
  3. 2
      src/core/lib/iomgr/tcp_posix.cc
  4. 8
      src/core/lib/promise/observable.h
  5. 8
      src/core/lib/security/transport/secure_endpoint.cc
  6. 8
      test/core/promise/activity_test.cc
  7. 4
      test/core/promise/observable_test.cc
  8. 8
      test/core/transport/chttp2/flow_control_test.cc
  9. 8
      test/core/transport/chttp2/graceful_shutdown_test.cc
  10. 9
      test/core/transport/chttp2/streams_not_seen_test.cc
  11. 16
      test/core/transport/chttp2/too_many_pings_test.cc
  12. 7
      test/cpp/end2end/xds/xds_end2end_test.cc
  13. 8
      test/cpp/end2end/xds/xds_end2end_test_lib.cc
  14. 36
      tools/run_tests/sanity/check_absl_mutex.sh
  15. 1
      tools/run_tests/sanity/sanity_tests.yaml

@ -344,7 +344,7 @@ class AresDNSResolver : public DNSResolver {
}
void Start() override {
absl::MutexLock lock(&mu_);
MutexLock lock(&mu_);
Ref().release(); // ref held by resolution
ares_request_ = std::unique_ptr<grpc_ares_request>(grpc_dns_lookup_ares(
"" /* dns_server */, name_.c_str(), default_port_.c_str(),
@ -357,7 +357,7 @@ class AresDNSResolver : public DNSResolver {
void Orphan() override {
{
absl::MutexLock lock(&mu_);
MutexLock lock(&mu_);
GRPC_CARES_TRACE_LOG("AresRequest:%p Orphan ares_request_:%p", this,
ares_request_.get());
if (ares_request_ != nullptr) {
@ -372,7 +372,7 @@ class AresDNSResolver : public DNSResolver {
AresRequest* r = static_cast<AresRequest*>(arg);
std::vector<grpc_resolved_address> resolved_addresses;
{
absl::MutexLock lock(&r->mu_);
MutexLock lock(&r->mu_);
GRPC_CARES_TRACE_LOG("AresRequest:%p OnDnsLookupDone error:%s", r,
grpc_error_std_string(error).c_str());
if (r->addresses_ != nullptr) {
@ -394,7 +394,7 @@ class AresDNSResolver : public DNSResolver {
// mutex to synchronize access to this object (but not to the ares_request
// object itself).
absl::Mutex mu_;
Mutex mu_;
// the name to resolve
const std::string name_;
// the default port to use if name doesn't have one

@ -29,19 +29,19 @@ template <typename T>
class Promise {
public:
T& Get() {
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
cv_.Wait(&mu_);
return val_;
}
void Set(T&& val) {
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
val_ = std::move(val);
cv_.Signal();
}
private:
absl::Mutex mu_;
absl::CondVar cv_;
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
T val_;
};

@ -378,7 +378,7 @@ struct grpc_tcp {
/* garbage after the last read */
grpc_slice_buffer last_read_buffer;
absl::Mutex read_mu;
grpc_core::Mutex read_mu;
grpc_slice_buffer* incoming_buffer ABSL_GUARDED_BY(read_mu) = nullptr;
int inq; /* bytes pending on the socket from the last read. */
bool inq_capable; /* cache whether kernel supports inq */

@ -81,14 +81,14 @@ class ObservableState {
}
Poll<absl::optional<T>> PollGet(ObservableVersion* version_seen) {
absl::MutexLock lock(&mu_);
MutexLock lock(&mu_);
if (!Started()) return Pending();
*version_seen = version_;
return value_;
}
Poll<absl::optional<T>> PollNext(ObservableVersion* version_seen) {
absl::MutexLock lock(&mu_);
MutexLock lock(&mu_);
if (!NextValueReady(version_seen)) return Pending();
return value_;
}
@ -96,7 +96,7 @@ class ObservableState {
Poll<absl::optional<T>> PollWatch(ObservableVersion* version_seen) {
if (*version_seen == kTombstoneVersion) return Pending();
absl::MutexLock lock(&mu_);
MutexLock lock(&mu_);
if (!NextValueReady(version_seen)) return Pending();
// Watch needs to be woken up if the value changes even if it's ready now.
waiters_.AddPending(Activity::current()->MakeNonOwningWaker());
@ -131,7 +131,7 @@ class ObservableState {
return true;
}
absl::Mutex mu_;
Mutex mu_;
WaitSet waiters_ ABSL_GUARDED_BY(mu_);
ObservableVersion version_ ABSL_GUARDED_BY(mu_) = 1;
absl::optional<T> value_ ABSL_GUARDED_BY(mu_);

@ -102,8 +102,8 @@ struct secure_endpoint {
struct tsi_frame_protector* protector;
struct tsi_zero_copy_grpc_protector* zero_copy_protector;
gpr_mu protector_mu;
absl::Mutex read_mu;
absl::Mutex write_mu;
grpc_core::Mutex read_mu;
grpc_core::Mutex write_mu;
/* saved upper level callbacks and user_data. */
grpc_closure* read_cb = nullptr;
grpc_closure* write_cb = nullptr;
@ -234,7 +234,7 @@ static void on_read(void* user_data, grpc_error_handle error) {
secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
{
absl::MutexLock l(&ep->read_mu);
grpc_core::MutexLock l(&ep->read_mu);
uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
@ -355,7 +355,7 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
{
absl::MutexLock l(&ep->write_mu);
grpc_core::MutexLock l(&ep->write_mu);
uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);

@ -38,7 +38,7 @@ class Barrier {
Promise<Result> Wait() {
return [this]() -> Poll<Result> {
absl::MutexLock lock(&mu_);
MutexLock lock(&mu_);
if (cleared_) {
return Result{};
} else {
@ -56,7 +56,7 @@ class Barrier {
}
private:
absl::Mutex mu_;
Mutex mu_;
WaitSet wait_set_ ABSL_GUARDED_BY(mu_);
bool cleared_ ABSL_GUARDED_BY(mu_) = false;
};
@ -69,7 +69,7 @@ class SingleBarrier {
Promise<Result> Wait() {
return [this]() -> Poll<Result> {
absl::MutexLock lock(&mu_);
MutexLock lock(&mu_);
if (cleared_) {
return Result{};
} else {
@ -88,7 +88,7 @@ class SingleBarrier {
}
private:
absl::Mutex mu_;
Mutex mu_;
Waker waker_ ABSL_GUARDED_BY(mu_);
bool cleared_ ABSL_GUARDED_BY(mu_) = false;
};

@ -33,7 +33,7 @@ class Barrier {
Promise<Result> Wait() {
return [this]() -> Poll<Result> {
absl::MutexLock lock(&mu_);
MutexLock lock(&mu_);
if (cleared_) {
return Result{};
} else {
@ -51,7 +51,7 @@ class Barrier {
}
private:
absl::Mutex mu_;
Mutex mu_;
WaitSet wait_set_ ABSL_GUARDED_BY(mu_);
bool cleared_ ABSL_GUARDED_BY(mu_) = false;
};

@ -58,7 +58,7 @@ class TransportTargetWindowSizeMocker
double /* current_target */) override {
// Protecting access to variable window_size_ shared between client and
// server.
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
if (alternating_initial_window_sizes_) {
window_size_ = (window_size_ == kLargeInitialWindowSize)
? kSmallInitialWindowSize
@ -70,20 +70,20 @@ class TransportTargetWindowSizeMocker
// Alternates the initial window size targets. Computes a low values if it was
// previously high, or a high value if it was previously low.
void AlternateTargetInitialWindowSizes() {
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
alternating_initial_window_sizes_ = true;
}
void Reset() {
// Protecting access to variable window_size_ shared between client and
// server.
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
alternating_initial_window_sizes_ = false;
window_size_ = kLargeInitialWindowSize;
}
private:
absl::Mutex mu_;
grpc_core::Mutex mu_;
bool alternating_initial_window_sizes_ ABSL_GUARDED_BY(mu_) = false;
double window_size_ ABSL_GUARDED_BY(mu_) = kLargeInitialWindowSize;
};

@ -142,7 +142,7 @@ class GracefulShutdownTest : public ::testing::Test {
GracefulShutdownTest* self = static_cast<GracefulShutdownTest*>(arg);
if (error == GRPC_ERROR_NONE) {
{
absl::MutexLock lock(&self->mu_);
MutexLock lock(&self->mu_);
for (size_t i = 0; i < self->read_buffer_.count; ++i) {
absl::StrAppend(&self->read_bytes_,
StringViewFromSlice(self->read_buffer_.slices[i]));
@ -162,7 +162,7 @@ class GracefulShutdownTest : public ::testing::Test {
void WaitForReadBytes(absl::string_view bytes) {
std::atomic<bool> done{false};
{
absl::MutexLock lock(&mu_);
MutexLock lock(&mu_);
while (!absl::StrContains(read_bytes_, bytes)) {
read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
}
@ -230,8 +230,8 @@ class GracefulShutdownTest : public ::testing::Test {
std::unique_ptr<std::thread> client_poll_thread_;
std::atomic<bool> shutdown_{false};
grpc_closure on_read_done_;
absl::Mutex mu_;
absl::CondVar read_cv_;
Mutex mu_;
CondVar read_cv_;
absl::Notification read_end_notification_;
grpc_slice_buffer read_buffer_;
std::string read_bytes_ ABSL_GUARDED_BY(mu_);

@ -25,7 +25,6 @@
#include <gmock/gmock.h>
#include "absl/synchronization/mutex.h"
#include "absl/synchronization/notification.h"
#include <grpc/grpc.h>
@ -332,7 +331,7 @@ class StreamsNotSeenTest : public ::testing::Test {
StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
if (error == GRPC_ERROR_NONE) {
{
absl::MutexLock lock(&self->mu_);
MutexLock lock(&self->mu_);
for (size_t i = 0; i < self->read_buffer_.count; ++i) {
absl::StrAppend(&self->read_bytes_,
StringViewFromSlice(self->read_buffer_.slices[i]));
@ -359,7 +358,7 @@ class StreamsNotSeenTest : public ::testing::Test {
}
});
{
absl::MutexLock lock(&mu_);
MutexLock lock(&mu_);
while (!absl::StrContains(read_bytes_, bytes)) {
read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
}
@ -384,8 +383,8 @@ class StreamsNotSeenTest : public ::testing::Test {
grpc_channel* channel_ = nullptr;
grpc_completion_queue* cq_ = nullptr;
cq_verifier* cqv_ = nullptr;
absl::Mutex mu_;
absl::CondVar read_cv_;
Mutex mu_;
CondVar read_cv_;
std::atomic<bool> shutdown_{false};
};

@ -61,36 +61,36 @@ namespace {
class TransportCounter {
public:
static void CounterInitCallback() {
absl::MutexLock lock(&mu());
grpc_core::MutexLock lock(&mu());
++count_;
}
static void CounterDestructCallback() {
absl::MutexLock lock(&mu());
grpc_core::MutexLock lock(&mu());
if (--count_ == 0) {
cv().SignalAll();
}
}
static void WaitForTransportsToBeDestroyed() {
absl::MutexLock lock(&mu());
grpc_core::MutexLock lock(&mu());
while (count_ != 0) {
ASSERT_FALSE(cv().WaitWithTimeout(&mu(), absl::Seconds(10)));
}
}
static int count() {
absl::MutexLock lock(&mu());
grpc_core::MutexLock lock(&mu());
return count_;
}
static absl::Mutex& mu() {
static absl::Mutex* mu = new absl::Mutex();
static grpc_core::Mutex& mu() {
static grpc_core::Mutex* mu = new grpc_core::Mutex();
return *mu;
}
static absl::CondVar& cv() {
static absl::CondVar* cv = new absl::CondVar();
static grpc_core::CondVar& cv() {
static grpc_core::CondVar* cv = new grpc_core::CondVar();
return *cv;
}

@ -41,7 +41,6 @@
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_replace.h"
#include "absl/synchronization/mutex.h"
#include "absl/types/optional.h"
#include <grpc/grpc.h>
@ -156,17 +155,17 @@ class FakeCertificateProvider final : public grpc_tls_certificate_provider {
class CertDataMapWrapper {
public:
CertDataMap Get() {
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
return cert_data_map_;
}
void Set(CertDataMap data) {
absl::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
cert_data_map_ = std::move(data);
}
private:
absl::Mutex mu_;
grpc_core::Mutex mu_;
CertDataMap cert_data_map_ ABSL_GUARDED_BY(mu_);
};

@ -883,8 +883,8 @@ std::vector<XdsEnd2endTest::ConcurrentRpc> XdsEnd2endTest::SendConcurrentRpcs(
std::vector<ConcurrentRpc> rpcs(num_rpcs);
EchoRequest request;
// Variables for synchronization
absl::Mutex mu;
absl::CondVar cv;
grpc_core::Mutex mu;
grpc_core::CondVar cv;
size_t completed = 0;
// Set-off callback RPCs
for (size_t i = 0; i < num_rpcs; i++) {
@ -897,14 +897,14 @@ std::vector<XdsEnd2endTest::ConcurrentRpc> XdsEnd2endTest::SendConcurrentRpcs(
rpc->elapsed_time = NowFromCycleCounter() - t0;
bool done;
{
absl::MutexLock lock(&mu);
grpc_core::MutexLock lock(&mu);
done = (++completed) == num_rpcs;
}
if (done) cv.Signal();
});
}
{
absl::MutexLock lock(&mu);
grpc_core::MutexLock lock(&mu);
cv.Wait(&mu);
}
EXPECT_EQ(completed, num_rpcs);

@ -0,0 +1,36 @@
#!/bin/sh
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
cd "$(dirname "$0")/../../.."
#
# Disallow the usage of absl::Mutex.
# Refer to https://github.com/grpc/grpc/issues/23661 and b/186685878 for context
# as to why absl::Mutex is problematic on some platforms.
#
find . \( \( -name "*.cc" \) -or \( -name "*.h" \) \) \
-a \( \( -wholename "./src/*" \) \
-or \( -wholename "./include/*" \) \
-or \( -wholename "./test/*" \) \
-or \( -wholename "./examples/*" \) \) \
-a -not -wholename "./include/grpcpp/impl/codegen/sync.h" \
-a -not -wholename "./src/core/lib/gprpp/sync.h" \
-a -not -wholename "./src/core/lib/gpr/sync_abseil.cc" \
-print0 |\
xargs -0 grep -n "absl::Mutex" | \
diff - /dev/null

@ -1,4 +1,5 @@
# a set of tests that are run in parallel for sanity tests
- script: tools/run_tests/sanity/check_absl_mutex.sh
- script: tools/run_tests/sanity/check_bad_dependencies.sh
- script: tools/run_tests/sanity/check_bazel_workspace.py
- script: tools/run_tests/sanity/check_buildifier.sh

Loading…
Cancel
Save