From 013e67a02932ff92214d7b9e67500dc1c6b97cf1 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Wed, 4 Aug 2021 20:03:25 -0400 Subject: [PATCH] Deprecate grpc{_core,::internal}::WaitUntil (#26866) It is not possible for such a function to be implemented in a way that is understood by annotalysis. Mark it deprecated and replace instances of its use with direct mutex/condvar usage. Add a bunch of missing thread safety annotations while I'm here. --- include/grpcpp/impl/codegen/sync.h | 1 + src/core/lib/gprpp/sync.h | 7 ---- src/core/lib/surface/server.cc | 4 ++- src/core/lib/surface/server.h | 6 ++-- src/cpp/server/server_cc.cc | 6 ++-- test/core/handshake/client_ssl.cc | 6 ++-- test/core/handshake/server_ssl_common.cc | 6 ++-- test/cpp/end2end/client_lb_end2end_test.cc | 32 ++++++++++--------- test/cpp/end2end/grpclb_end2end_test.cc | 21 ++++++------ test/cpp/end2end/mock_test.cc | 32 +++++++++++-------- .../end2end/service_config_end2end_test.cc | 32 ++++++++++--------- test/cpp/end2end/xds_end2end_test.cc | 10 +++--- 12 files changed, 86 insertions(+), 77 deletions(-) diff --git a/include/grpcpp/impl/codegen/sync.h b/include/grpcpp/impl/codegen/sync.h index b45d7cb8194..1f4e4aac589 100644 --- a/include/grpcpp/impl/codegen/sync.h +++ b/include/grpcpp/impl/codegen/sync.h @@ -144,6 +144,7 @@ class CondVar { #endif // GPR_ABSEIL_SYNC template +GRPC_DEPRECATED("incompatible with thread safety analysis") static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) { while (!pred()) { cv->Wait(mu); diff --git a/src/core/lib/gprpp/sync.h b/src/core/lib/gprpp/sync.h index d04e99740b7..d53585995cb 100644 --- a/src/core/lib/gprpp/sync.h +++ b/src/core/lib/gprpp/sync.h @@ -144,13 +144,6 @@ class CondVar { #endif // GPR_ABSEIL_SYNC -template -static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) { - while (!pred()) { - cv->Wait(mu); - } -} - // Deprecated. Prefer MutexLock class MutexLockForGprMu { public: diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index a8b4aaaecf9..558d41a7d75 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -809,7 +809,9 @@ void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) { { // Wait for startup to be finished. Locks mu_global. MutexLock lock(&mu_global_); - WaitUntil(&starting_cv_, &mu_global_, [this] { return !starting_; }); + while (starting_) { + starting_cv_.Wait(&mu_global_); + } // Stay locked, and gather up some stuff to do. GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (shutdown_published_) { diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 413a5a8a0a2..7061c427b67 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -416,9 +416,9 @@ class Server : public InternallyRefCounted { Mutex mu_global_; // mutex for server and channel state Mutex mu_call_; // mutex for call-specific state - // startup synchronization: flag is protected by mu_global_, signals whether - // we are doing the listener start routine or not. - bool starting_ = false; + // startup synchronization: flag, signals whether we are doing the listener + // start routine or not. + bool starting_ ABSL_GUARDED_BY(mu_global_) = false; CondVar starting_cv_; std::vector> registered_methods_; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index c82c83c1732..efa12aa79f2 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -1110,9 +1110,9 @@ void Server::UnrefAndWaitLocked() { shutdown_done_ = true; return; // no need to wait on CV since done condition already set } - grpc::internal::WaitUntil( - &shutdown_done_cv_, &mu_, - [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return shutdown_done_; }); + while (!shutdown_done_) { + shutdown_done_cv_.Wait(&mu_); + } } void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { diff --git a/test/core/handshake/client_ssl.cc b/test/core/handshake/client_ssl.cc index 8ec239ab231..78413cc4630 100644 --- a/test/core/handshake/client_ssl.cc +++ b/test/core/handshake/client_ssl.cc @@ -63,13 +63,15 @@ class SslLibraryInfo { void Await() { grpc_core::MutexLock lock(&mu_); - grpc_core::WaitUntil(&cv_, &mu_, [this] { return ready_; }); + while (!ready_) { + cv_.Wait(&mu_); + } } private: grpc_core::Mutex mu_; grpc_core::CondVar cv_; - bool ready_ = false; + bool ready_ ABSL_GUARDED_BY(mu_) = false; }; // Arguments for TLS server thread. diff --git a/test/core/handshake/server_ssl_common.cc b/test/core/handshake/server_ssl_common.cc index 026a52c69ee..ce410520f39 100644 --- a/test/core/handshake/server_ssl_common.cc +++ b/test/core/handshake/server_ssl_common.cc @@ -86,14 +86,16 @@ class ServerInfo { void Await() { grpc_core::MutexLock lock(&mu_); - grpc_core::WaitUntil(&cv_, &mu_, [this] { return ready_; }); + while (!ready_) { + cv_.Wait(&mu_); + } } private: const int port_; grpc_core::Mutex mu_; grpc_core::CondVar cv_; - bool ready_ = false; + bool ready_ ABSL_GUARDED_BY(mu_) = false; }; // Simple gRPC server. This listens until client_handshake_complete occurs. diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 0e66a9b9c04..4e0ab6e35fb 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -357,32 +357,33 @@ class ClientLbEnd2endTest : public ::testing::Test { } struct ServerData { - int port_; + const int port_; std::unique_ptr server_; MyTestServiceImpl service_; std::unique_ptr thread_; - bool server_ready_ = false; - bool started_ = false; - explicit ServerData(int port = 0) { - port_ = port > 0 ? port : grpc_pick_unused_port_or_die(); - } + grpc::internal::Mutex mu_; + grpc::internal::CondVar cond_; + bool server_ready_ ABSL_GUARDED_BY(mu_) = false; + bool started_ ABSL_GUARDED_BY(mu_) = false; + + explicit ServerData(int port = 0) + : port_(port > 0 ? port : grpc_pick_unused_port_or_die()) {} void Start(const std::string& server_host) { gpr_log(GPR_INFO, "starting server on port %d", port_); + grpc::internal::MutexLock lock(&mu_); started_ = true; - grpc::internal::Mutex mu; - grpc::internal::MutexLock lock(&mu); - grpc::internal::CondVar cond; thread_ = absl::make_unique( - std::bind(&ServerData::Serve, this, server_host, &mu, &cond)); - grpc::internal::WaitUntil(&cond, &mu, [this] { return server_ready_; }); + std::bind(&ServerData::Serve, this, server_host)); + while (!server_ready_) { + cond_.Wait(&mu_); + } server_ready_ = false; gpr_log(GPR_INFO, "server startup complete"); } - void Serve(const std::string& server_host, grpc::internal::Mutex* mu, - grpc::internal::CondVar* cond) { + void Serve(const std::string& server_host) { std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -391,12 +392,13 @@ class ClientLbEnd2endTest : public ::testing::Test { builder.AddListeningPort(server_address.str(), std::move(creds)); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); - grpc::internal::MutexLock lock(mu); + grpc::internal::MutexLock lock(&mu_); server_ready_ = true; - cond->Signal(); + cond_.Signal(); } void Shutdown() { + grpc::internal::MutexLock lock(&mu_); if (!started_) return; server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); thread_->join(); diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index f4045947c1e..e6e2e9fdc3d 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -274,8 +274,9 @@ class BalancerServiceImpl : public BalancerService { } { grpc::internal::MutexLock lock(&mu_); - grpc::internal::WaitUntil(&serverlist_cond_, &mu_, - [this] { return serverlist_done_; }); + while (!serverlist_done_) { + serverlist_cond_.Wait(&mu_); + } } if (client_load_reporting_interval_seconds_ > 0) { @@ -304,7 +305,7 @@ class BalancerServiceImpl : public BalancerService { // below from firing before its corresponding wait is executed. grpc::internal::MutexLock lock(&mu_); load_report_queue_.emplace_back(std::move(load_report)); - if (load_report_cond_ != nullptr) load_report_cond_->Signal(); + load_report_cond_.Signal(); } } } @@ -332,12 +333,10 @@ class BalancerServiceImpl : public BalancerService { ClientStats WaitForLoadReport() { grpc::internal::MutexLock lock(&mu_); - grpc::internal::CondVar cv; if (load_report_queue_.empty()) { - load_report_cond_ = &cv; - grpc::internal::WaitUntil(load_report_cond_, &mu_, - [this] { return !load_report_queue_.empty(); }); - load_report_cond_ = nullptr; + while (load_report_queue_.empty()) { + load_report_cond_.Wait(&mu_); + } } ClientStats load_report = std::move(load_report_queue_.front()); load_report_queue_.pop_front(); @@ -376,9 +375,9 @@ class BalancerServiceImpl : public BalancerService { grpc::internal::Mutex mu_; grpc::internal::CondVar serverlist_cond_; - bool serverlist_done_ = false; - grpc::internal::CondVar* load_report_cond_ = nullptr; - std::deque load_report_queue_; + bool serverlist_done_ ABSL_GUARDED_BY(mu_) = false; + grpc::internal::CondVar load_report_cond_; + std::deque load_report_queue_ ABSL_GUARDED_BY(mu_); }; class GrpclbEnd2endTest : public ::testing::Test { diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index 82b955d3c4e..40a8b137a9a 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -29,6 +29,8 @@ #include #include +#include "absl/types/optional.h" + #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/echo_mock.grpc.pb.h" @@ -191,28 +193,30 @@ TEST_F(MockCallbackTest, MockedCallSucceedsWithWait) { CallbackServerContext ctx; EchoRequest req; EchoResponse resp; - grpc::internal::Mutex mu; - grpc::internal::CondVar cv; - grpc::Status status; - bool status_set = false; + struct { + grpc::internal::Mutex mu; + grpc::internal::CondVar cv; + absl::optional ABSL_GUARDED_BY(mu) status; + } status; DefaultReactorTestPeer peer(&ctx, [&](::grpc::Status s) { - grpc::internal::MutexLock l(&mu); - status_set = true; - status = std::move(s); - cv.Signal(); + grpc::internal::MutexLock l(&status.mu); + status.status = std::move(s); + status.cv.Signal(); }); req.set_message("mock 1"); auto* reactor = service_.Echo(&ctx, &req, &resp); - grpc::internal::WaitUntil(&cv, &mu, [&] { - grpc::internal::MutexLock l(&mu); - return status_set; - }); + + grpc::internal::MutexLock l(&status.mu); + while (!status.status.has_value()) { + status.cv.Wait(&status.mu); + } + EXPECT_EQ(reactor, peer.reactor()); EXPECT_TRUE(peer.test_status_set()); EXPECT_TRUE(peer.test_status().ok()); - EXPECT_TRUE(status_set); - EXPECT_TRUE(status.ok()); + EXPECT_TRUE(status.status.has_value()); + EXPECT_TRUE(status.status.value().ok()); EXPECT_EQ(req.message(), resp.message()); } diff --git a/test/cpp/end2end/service_config_end2end_test.cc b/test/cpp/end2end/service_config_end2end_test.cc index 67e02bc72a1..5b3f2f71a47 100644 --- a/test/cpp/end2end/service_config_end2end_test.cc +++ b/test/cpp/end2end/service_config_end2end_test.cc @@ -300,32 +300,33 @@ class ServiceConfigEnd2endTest : public ::testing::Test { } struct ServerData { - int port_; + const int port_; std::unique_ptr server_; MyTestServiceImpl service_; std::unique_ptr thread_; - bool server_ready_ = false; - bool started_ = false; - explicit ServerData(int port = 0) { - port_ = port > 0 ? port : grpc_pick_unused_port_or_die(); - } + grpc::internal::Mutex mu_; + grpc::internal::CondVar cond_; + bool server_ready_ ABSL_GUARDED_BY(mu_) = false; + bool started_ ABSL_GUARDED_BY(mu_) = false; + + explicit ServerData(int port = 0) + : port_(port > 0 ? port : grpc_pick_unused_port_or_die()) {} void Start(const std::string& server_host) { gpr_log(GPR_INFO, "starting server on port %d", port_); + grpc::internal::MutexLock lock(&mu_); started_ = true; - grpc::internal::Mutex mu; - grpc::internal::MutexLock lock(&mu); - grpc::internal::CondVar cond; thread_ = absl::make_unique( - std::bind(&ServerData::Serve, this, server_host, &mu, &cond)); - grpc::internal::WaitUntil(&cond, &mu, [this] { return server_ready_; }); + std::bind(&ServerData::Serve, this, server_host)); + while (!server_ready_) { + cond_.Wait(&mu_); + } server_ready_ = false; gpr_log(GPR_INFO, "server startup complete"); } - void Serve(const std::string& server_host, grpc::internal::Mutex* mu, - grpc::internal::CondVar* cond) { + void Serve(const std::string& server_host) { std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -334,12 +335,13 @@ class ServiceConfigEnd2endTest : public ::testing::Test { builder.AddListeningPort(server_address.str(), std::move(creds)); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); - grpc::internal::MutexLock lock(mu); + grpc::internal::MutexLock lock(&mu_); server_ready_ = true; - cond->Signal(); + cond_.Signal(); } void Shutdown() { + grpc::internal::MutexLock lock(&mu_); if (!started_) return; server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); thread_->join(); diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index c2f8821f15f..c3a5d6b5c32 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -1211,8 +1211,9 @@ class LrsServiceImpl : public std::enable_shared_from_this { grpc_core::CondVar cv; if (result_queue_.empty()) { load_report_cond_ = &cv; - grpc_core::WaitUntil(load_report_cond_, &load_report_mu_, - [this] { return !result_queue_.empty(); }); + while (result_queue_.empty()) { + cv.Wait(&load_report_mu_); + } load_report_cond_ = nullptr; } std::vector result = std::move(result_queue_.front()); @@ -1279,8 +1280,9 @@ class LrsServiceImpl : public std::enable_shared_from_this { } // Wait until notified done. grpc_core::MutexLock lock(&parent_->lrs_mu_); - grpc_core::WaitUntil(&parent_->lrs_cv_, &parent_->lrs_mu_, - [this] { return parent_->lrs_done_; }); + while (!parent_->lrs_done_) { + parent_->lrs_cv_.Wait(&parent_->lrs_mu_); + } } gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this); return Status::OK;