Use atomics to remove locks in Channel/Server::CallbackCQ() (#26091)

pull/26100/head
Vijay Pai 4 years ago committed by GitHub
parent 9977bef14b
commit dc6948cf35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      include/grpcpp/channel.h
  2. 2
      include/grpcpp/server.h
  3. 25
      src/cpp/client/channel_cc.cc
  4. 39
      src/cpp/server/server_cc.cc

@ -114,7 +114,7 @@ class Channel final : public ::grpc::ChannelInterface,
// with this channel (if any). It is set on the first call to CallbackCQ().
// It is _not owned_ by the channel; ownership belongs with its internal
// shutdown callback tag (invoked when the CQ is fully shutdown).
::grpc::CompletionQueue* callback_cq_ = nullptr;
std::atomic<CompletionQueue*> callback_cq_{nullptr};
std::vector<
std::unique_ptr<::grpc::experimental::ClientInterceptorFactoryInterface>>

@ -380,7 +380,7 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
// with this server (if any). It is set on the first call to CallbackCQ().
// It is _not owned_ by the server; ownership belongs with its internal
// shutdown callback tag (invoked when the CQ is fully shutdown).
CompletionQueue* callback_cq_ ABSL_GUARDED_BY(mu_) = nullptr;
std::atomic<CompletionQueue*> callback_cq_{nullptr};
// List of CQs passed in by user that must be Shutdown only after Server is
// Shutdown. Even though this is only used with NDEBUG, instantiate it in all

@ -57,12 +57,13 @@ Channel::Channel(const std::string& host, grpc_channel* channel,
Channel::~Channel() {
grpc_channel_destroy(c_channel_);
if (callback_cq_ != nullptr) {
CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed);
if (callback_cq != nullptr) {
if (grpc_iomgr_run_in_background()) {
// gRPC-core provides the backing needed for the preferred CQ type
callback_cq_->Shutdown();
callback_cq->Shutdown();
} else {
CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_);
CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
}
}
}
@ -243,25 +244,33 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
::grpc::CompletionQueue* Channel::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-channel CQ registered
CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire);
if (callback_cq != nullptr) {
return callback_cq;
}
// The callback_cq_ wasn't already set, so grab a lock and set it up exactly
// once for this channel.
grpc::internal::MutexLock l(&mu_);
if (callback_cq_ == nullptr) {
callback_cq = callback_cq_.load(std::memory_order_relaxed);
if (callback_cq == nullptr) {
if (grpc_iomgr_run_in_background()) {
// gRPC-core provides the backing needed for the preferred CQ type
auto* shutdown_callback = new ShutdownCallback;
callback_cq_ =
callback_cq =
new ::grpc::CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK,
GRPC_CQ_DEFAULT_POLLING, shutdown_callback});
// Transfer ownership of the new cq to its own shutdown callback
shutdown_callback->TakeCQ(callback_cq_);
shutdown_callback->TakeCQ(callback_cq);
} else {
// Otherwise we need to use the alternative CQ variant
callback_cq_ = CompletionQueue::CallbackAlternativeCQ();
callback_cq = CompletionQueue::CallbackAlternativeCQ();
}
callback_cq_.store(callback_cq, std::memory_order_release);
}
return callback_cq_;
return callback_cq;
}
} // namespace grpc

@ -930,14 +930,16 @@ Server::~Server() {
for (const auto& value : sync_req_mgrs_) {
value->Shutdown();
}
if (callback_cq_ != nullptr) {
CompletionQueue* callback_cq =
callback_cq_.load(std::memory_order_relaxed);
if (callback_cq != nullptr) {
if (grpc_iomgr_run_in_background()) {
// gRPC-core provides the backing needed for the preferred CQ type
callback_cq_->Shutdown();
callback_cq->Shutdown();
} else {
CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_);
CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
}
callback_cq_ = nullptr;
callback_cq_.store(nullptr, std::memory_order_release);
}
}
}
@ -1256,14 +1258,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
// will delete itself at true shutdown.
if (callback_cq_ != nullptr) {
CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed);
if (callback_cq != nullptr) {
if (grpc_iomgr_run_in_background()) {
// gRPC-core provides the backing needed for the preferred CQ type
callback_cq_->Shutdown();
callback_cq->Shutdown();
} else {
CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq_);
CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
}
callback_cq_ = nullptr;
callback_cq_.store(nullptr, std::memory_order_release);
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
@ -1330,25 +1333,33 @@ grpc::ServerInitializer* Server::initializer() {
grpc::CompletionQueue* Server::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-server CQ registered
CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire);
if (callback_cq != nullptr) {
return callback_cq;
}
// The callback_cq_ wasn't already set, so grab a lock and set it up exactly
// once for this server.
grpc::internal::MutexLock l(&mu_);
if (callback_cq_ != nullptr) {
return callback_cq_;
callback_cq = callback_cq_.load(std::memory_order_relaxed);
if (callback_cq != nullptr) {
return callback_cq;
}
if (grpc_iomgr_run_in_background()) {
// gRPC-core provides the backing needed for the preferred CQ type
auto* shutdown_callback = new grpc::ShutdownCallback;
callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{
callback_cq = new grpc::CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
shutdown_callback});
// Transfer ownership of the new cq to its own shutdown callback
shutdown_callback->TakeCQ(callback_cq_);
shutdown_callback->TakeCQ(callback_cq);
} else {
// Otherwise we need to use the alternative CQ variant
callback_cq_ = CompletionQueue::CallbackAlternativeCQ();
callback_cq = CompletionQueue::CallbackAlternativeCQ();
}
return callback_cq_;
callback_cq_.store(callback_cq, std::memory_order_release);
return callback_cq;
}
} // namespace grpc

Loading…
Cancel
Save