diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h index 5e67c642a47..65cc46d151f 100644 --- a/include/grpcpp/channel.h +++ b/include/grpcpp/channel.h @@ -114,7 +114,7 @@ class Channel final : public ::grpc::ChannelInterface, // with this channel (if any). It is set on the first call to CallbackCQ(). // It is _not owned_ by the channel; ownership belongs with its internal // shutdown callback tag (invoked when the CQ is fully shutdown). - ::grpc::CompletionQueue* callback_cq_ = nullptr; + std::atomic callback_cq_{nullptr}; std::vector< std::unique_ptr<::grpc::experimental::ClientInterceptorFactoryInterface>> diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index 7f367dc5988..a69e64b462f 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -380,7 +380,7 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { // with this server (if any). It is set on the first call to CallbackCQ(). // It is _not owned_ by the server; ownership belongs with its internal // shutdown callback tag (invoked when the CQ is fully shutdown). - CompletionQueue* callback_cq_ ABSL_GUARDED_BY(mu_) = nullptr; + std::atomic callback_cq_{nullptr}; // List of CQs passed in by user that must be Shutdown only after Server is // Shutdown. Even though this is only used with NDEBUG, instantiate it in all diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 56dee0b769a..a9f208394ec 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -57,12 +57,13 @@ Channel::Channel(const std::string& host, grpc_channel* channel, Channel::~Channel() { grpc_channel_destroy(c_channel_); - if (callback_cq_ != nullptr) { + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { if (grpc_iomgr_run_in_background()) { // gRPC-core provides the backing needed for the preferred CQ type - callback_cq_->Shutdown(); + callback_cq->Shutdown(); } else { - CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_); + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); } } } @@ -243,25 +244,33 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { ::grpc::CompletionQueue* Channel::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-channel CQ registered + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire); + if (callback_cq != nullptr) { + return callback_cq; + } + // The callback_cq_ wasn't already set, so grab a lock and set it up exactly + // once for this channel. grpc::internal::MutexLock l(&mu_); - if (callback_cq_ == nullptr) { + callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq == nullptr) { if (grpc_iomgr_run_in_background()) { // gRPC-core provides the backing needed for the preferred CQ type auto* shutdown_callback = new ShutdownCallback; - callback_cq_ = + callback_cq = new ::grpc::CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); + shutdown_callback->TakeCQ(callback_cq); } else { // Otherwise we need to use the alternative CQ variant - callback_cq_ = CompletionQueue::CallbackAlternativeCQ(); + callback_cq = CompletionQueue::CallbackAlternativeCQ(); } + callback_cq_.store(callback_cq, std::memory_order_release); } - return callback_cq_; + return callback_cq; } } // namespace grpc diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 2844bfa5137..d8f5af5e35f 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -930,14 +930,16 @@ Server::~Server() { for (const auto& value : sync_req_mgrs_) { value->Shutdown(); } - if (callback_cq_ != nullptr) { + CompletionQueue* callback_cq = + callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { if (grpc_iomgr_run_in_background()) { // gRPC-core provides the backing needed for the preferred CQ type - callback_cq_->Shutdown(); + callback_cq->Shutdown(); } else { - CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_); + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); } - callback_cq_ = nullptr; + callback_cq_.store(nullptr, std::memory_order_release); } } } @@ -1256,14 +1258,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it // will delete itself at true shutdown. - if (callback_cq_ != nullptr) { + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { if (grpc_iomgr_run_in_background()) { // gRPC-core provides the backing needed for the preferred CQ type - callback_cq_->Shutdown(); + callback_cq->Shutdown(); } else { - CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_); + CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq); } - callback_cq_ = nullptr; + callback_cq_.store(nullptr, std::memory_order_release); } // Drain the shutdown queue (if the previous call to AsyncNext() timed out @@ -1330,25 +1333,33 @@ grpc::ServerInitializer* Server::initializer() { grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered + CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire); + if (callback_cq != nullptr) { + return callback_cq; + } + // The callback_cq_ wasn't already set, so grab a lock and set it up exactly + // once for this server. grpc::internal::MutexLock l(&mu_); - if (callback_cq_ != nullptr) { - return callback_cq_; + callback_cq = callback_cq_.load(std::memory_order_relaxed); + if (callback_cq != nullptr) { + return callback_cq; } if (grpc_iomgr_run_in_background()) { // gRPC-core provides the backing needed for the preferred CQ type auto* shutdown_callback = new grpc::ShutdownCallback; - callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ + callback_cq = new grpc::CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); + shutdown_callback->TakeCQ(callback_cq); } else { // Otherwise we need to use the alternative CQ variant - callback_cq_ = CompletionQueue::CallbackAlternativeCQ(); + callback_cq = CompletionQueue::CallbackAlternativeCQ(); } - return callback_cq_; + callback_cq_.store(callback_cq, std::memory_order_release); + return callback_cq; } } // namespace grpc