Deprecate grpc{_core,::internal}::WaitUntil (#26866)

It is not possible for such a function to be implemented in a way that
is understood by annotalysis. Mark it deprecated and replace instances
of its use with direct mutex/condvar usage.

Add a bunch of missing thread safety annotations while I'm here.
pull/26874/head
Tamir Duberstein 4 years ago committed by GitHub
parent 239acada8d
commit 013e67a029
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      include/grpcpp/impl/codegen/sync.h
  2. 7
      src/core/lib/gprpp/sync.h
  3. 4
      src/core/lib/surface/server.cc
  4. 6
      src/core/lib/surface/server.h
  5. 6
      src/cpp/server/server_cc.cc
  6. 6
      test/core/handshake/client_ssl.cc
  7. 6
      test/core/handshake/server_ssl_common.cc
  8. 32
      test/cpp/end2end/client_lb_end2end_test.cc
  9. 21
      test/cpp/end2end/grpclb_end2end_test.cc
  10. 32
      test/cpp/end2end/mock_test.cc
  11. 32
      test/cpp/end2end/service_config_end2end_test.cc
  12. 10
      test/cpp/end2end/xds_end2end_test.cc

@ -144,6 +144,7 @@ class CondVar {
#endif // GPR_ABSEIL_SYNC
template <typename Predicate>
GRPC_DEPRECATED("incompatible with thread safety analysis")
static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) {
while (!pred()) {
cv->Wait(mu);

@ -144,13 +144,6 @@ class CondVar {
#endif // GPR_ABSEIL_SYNC
template <typename Predicate>
static void WaitUntil(CondVar* cv, Mutex* mu, Predicate pred) {
while (!pred()) {
cv->Wait(mu);
}
}
// Deprecated. Prefer MutexLock
class MutexLockForGprMu {
public:

@ -809,7 +809,9 @@ void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
{
// Wait for startup to be finished. Locks mu_global.
MutexLock lock(&mu_global_);
WaitUntil(&starting_cv_, &mu_global_, [this] { return !starting_; });
while (starting_) {
starting_cv_.Wait(&mu_global_);
}
// Stay locked, and gather up some stuff to do.
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (shutdown_published_) {

@ -416,9 +416,9 @@ class Server : public InternallyRefCounted<Server> {
Mutex mu_global_; // mutex for server and channel state
Mutex mu_call_; // mutex for call-specific state
// startup synchronization: flag is protected by mu_global_, signals whether
// we are doing the listener start routine or not.
bool starting_ = false;
// startup synchronization: flag, signals whether we are doing the listener
// start routine or not.
bool starting_ ABSL_GUARDED_BY(mu_global_) = false;
CondVar starting_cv_;
std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_;

@ -1110,9 +1110,9 @@ void Server::UnrefAndWaitLocked() {
shutdown_done_ = true;
return; // no need to wait on CV since done condition already set
}
grpc::internal::WaitUntil(
&shutdown_done_cv_, &mu_,
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return shutdown_done_; });
while (!shutdown_done_) {
shutdown_done_cv_.Wait(&mu_);
}
}
void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {

@ -63,13 +63,15 @@ class SslLibraryInfo {
void Await() {
grpc_core::MutexLock lock(&mu_);
grpc_core::WaitUntil(&cv_, &mu_, [this] { return ready_; });
while (!ready_) {
cv_.Wait(&mu_);
}
}
private:
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
bool ready_ = false;
bool ready_ ABSL_GUARDED_BY(mu_) = false;
};
// Arguments for TLS server thread.

@ -86,14 +86,16 @@ class ServerInfo {
void Await() {
grpc_core::MutexLock lock(&mu_);
grpc_core::WaitUntil(&cv_, &mu_, [this] { return ready_; });
while (!ready_) {
cv_.Wait(&mu_);
}
}
private:
const int port_;
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
bool ready_ = false;
bool ready_ ABSL_GUARDED_BY(mu_) = false;
};
// Simple gRPC server. This listens until client_handshake_complete occurs.

@ -357,32 +357,33 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
struct ServerData {
int port_;
const int port_;
std::unique_ptr<Server> server_;
MyTestServiceImpl service_;
std::unique_ptr<std::thread> thread_;
bool server_ready_ = false;
bool started_ = false;
explicit ServerData(int port = 0) {
port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
}
grpc::internal::Mutex mu_;
grpc::internal::CondVar cond_;
bool server_ready_ ABSL_GUARDED_BY(mu_) = false;
bool started_ ABSL_GUARDED_BY(mu_) = false;
explicit ServerData(int port = 0)
: port_(port > 0 ? port : grpc_pick_unused_port_or_die()) {}
void Start(const std::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
grpc::internal::MutexLock lock(&mu_);
started_ = true;
grpc::internal::Mutex mu;
grpc::internal::MutexLock lock(&mu);
grpc::internal::CondVar cond;
thread_ = absl::make_unique<std::thread>(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond));
grpc::internal::WaitUntil(&cond, &mu, [this] { return server_ready_; });
std::bind(&ServerData::Serve, this, server_host));
while (!server_ready_) {
cond_.Wait(&mu_);
}
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}
void Serve(const std::string& server_host, grpc::internal::Mutex* mu,
grpc::internal::CondVar* cond) {
void Serve(const std::string& server_host) {
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
@ -391,12 +392,13 @@ class ClientLbEnd2endTest : public ::testing::Test {
builder.AddListeningPort(server_address.str(), std::move(creds));
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
grpc::internal::MutexLock lock(mu);
grpc::internal::MutexLock lock(&mu_);
server_ready_ = true;
cond->Signal();
cond_.Signal();
}
void Shutdown() {
grpc::internal::MutexLock lock(&mu_);
if (!started_) return;
server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
thread_->join();

@ -274,8 +274,9 @@ class BalancerServiceImpl : public BalancerService {
}
{
grpc::internal::MutexLock lock(&mu_);
grpc::internal::WaitUntil(&serverlist_cond_, &mu_,
[this] { return serverlist_done_; });
while (!serverlist_done_) {
serverlist_cond_.Wait(&mu_);
}
}
if (client_load_reporting_interval_seconds_ > 0) {
@ -304,7 +305,7 @@ class BalancerServiceImpl : public BalancerService {
// below from firing before its corresponding wait is executed.
grpc::internal::MutexLock lock(&mu_);
load_report_queue_.emplace_back(std::move(load_report));
if (load_report_cond_ != nullptr) load_report_cond_->Signal();
load_report_cond_.Signal();
}
}
}
@ -332,12 +333,10 @@ class BalancerServiceImpl : public BalancerService {
ClientStats WaitForLoadReport() {
grpc::internal::MutexLock lock(&mu_);
grpc::internal::CondVar cv;
if (load_report_queue_.empty()) {
load_report_cond_ = &cv;
grpc::internal::WaitUntil(load_report_cond_, &mu_,
[this] { return !load_report_queue_.empty(); });
load_report_cond_ = nullptr;
while (load_report_queue_.empty()) {
load_report_cond_.Wait(&mu_);
}
}
ClientStats load_report = std::move(load_report_queue_.front());
load_report_queue_.pop_front();
@ -376,9 +375,9 @@ class BalancerServiceImpl : public BalancerService {
grpc::internal::Mutex mu_;
grpc::internal::CondVar serverlist_cond_;
bool serverlist_done_ = false;
grpc::internal::CondVar* load_report_cond_ = nullptr;
std::deque<ClientStats> load_report_queue_;
bool serverlist_done_ ABSL_GUARDED_BY(mu_) = false;
grpc::internal::CondVar load_report_cond_;
std::deque<ClientStats> load_report_queue_ ABSL_GUARDED_BY(mu_);
};
class GrpclbEnd2endTest : public ::testing::Test {

@ -29,6 +29,8 @@
#include <grpcpp/server_context.h>
#include <grpcpp/test/default_reactor_test_peer.h>
#include "absl/types/optional.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "src/proto/grpc/testing/echo_mock.grpc.pb.h"
@ -191,28 +193,30 @@ TEST_F(MockCallbackTest, MockedCallSucceedsWithWait) {
CallbackServerContext ctx;
EchoRequest req;
EchoResponse resp;
grpc::internal::Mutex mu;
grpc::internal::CondVar cv;
grpc::Status status;
bool status_set = false;
struct {
grpc::internal::Mutex mu;
grpc::internal::CondVar cv;
absl::optional<grpc::Status> ABSL_GUARDED_BY(mu) status;
} status;
DefaultReactorTestPeer peer(&ctx, [&](::grpc::Status s) {
grpc::internal::MutexLock l(&mu);
status_set = true;
status = std::move(s);
cv.Signal();
grpc::internal::MutexLock l(&status.mu);
status.status = std::move(s);
status.cv.Signal();
});
req.set_message("mock 1");
auto* reactor = service_.Echo(&ctx, &req, &resp);
grpc::internal::WaitUntil(&cv, &mu, [&] {
grpc::internal::MutexLock l(&mu);
return status_set;
});
grpc::internal::MutexLock l(&status.mu);
while (!status.status.has_value()) {
status.cv.Wait(&status.mu);
}
EXPECT_EQ(reactor, peer.reactor());
EXPECT_TRUE(peer.test_status_set());
EXPECT_TRUE(peer.test_status().ok());
EXPECT_TRUE(status_set);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(status.status.has_value());
EXPECT_TRUE(status.status.value().ok());
EXPECT_EQ(req.message(), resp.message());
}

@ -300,32 +300,33 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
}
struct ServerData {
int port_;
const int port_;
std::unique_ptr<Server> server_;
MyTestServiceImpl service_;
std::unique_ptr<std::thread> thread_;
bool server_ready_ = false;
bool started_ = false;
explicit ServerData(int port = 0) {
port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
}
grpc::internal::Mutex mu_;
grpc::internal::CondVar cond_;
bool server_ready_ ABSL_GUARDED_BY(mu_) = false;
bool started_ ABSL_GUARDED_BY(mu_) = false;
explicit ServerData(int port = 0)
: port_(port > 0 ? port : grpc_pick_unused_port_or_die()) {}
void Start(const std::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
grpc::internal::MutexLock lock(&mu_);
started_ = true;
grpc::internal::Mutex mu;
grpc::internal::MutexLock lock(&mu);
grpc::internal::CondVar cond;
thread_ = absl::make_unique<std::thread>(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond));
grpc::internal::WaitUntil(&cond, &mu, [this] { return server_ready_; });
std::bind(&ServerData::Serve, this, server_host));
while (!server_ready_) {
cond_.Wait(&mu_);
}
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}
void Serve(const std::string& server_host, grpc::internal::Mutex* mu,
grpc::internal::CondVar* cond) {
void Serve(const std::string& server_host) {
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
@ -334,12 +335,13 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
builder.AddListeningPort(server_address.str(), std::move(creds));
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
grpc::internal::MutexLock lock(mu);
grpc::internal::MutexLock lock(&mu_);
server_ready_ = true;
cond->Signal();
cond_.Signal();
}
void Shutdown() {
grpc::internal::MutexLock lock(&mu_);
if (!started_) return;
server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
thread_->join();

@ -1211,8 +1211,9 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
grpc_core::CondVar cv;
if (result_queue_.empty()) {
load_report_cond_ = &cv;
grpc_core::WaitUntil(load_report_cond_, &load_report_mu_,
[this] { return !result_queue_.empty(); });
while (result_queue_.empty()) {
cv.Wait(&load_report_mu_);
}
load_report_cond_ = nullptr;
}
std::vector<ClientStats> result = std::move(result_queue_.front());
@ -1279,8 +1280,9 @@ class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
}
// Wait until notified done.
grpc_core::MutexLock lock(&parent_->lrs_mu_);
grpc_core::WaitUntil(&parent_->lrs_cv_, &parent_->lrs_mu_,
[this] { return parent_->lrs_done_; });
while (!parent_->lrs_done_) {
parent_->lrs_cv_.Wait(&parent_->lrs_mu_);
}
}
gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this);
return Status::OK;

Loading…
Cancel
Save