Merge pull request #21443 from vjpai/fix_bug_21210

Move operations out of lock in Bind functions
pull/21464/head
Vijay Pai 5 years ago committed by GitHub
commit 377a32c0ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 226
      include/grpcpp/impl/codegen/server_callback_impl.h

@ -240,7 +240,7 @@ class ServerBidiReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&stream_mu_); grpc::internal::MutexLock l(&stream_mu_);
stream = stream_.load(std::memory_order_relaxed); stream = stream_.load(std::memory_order_relaxed);
if (stream == nullptr) { if (stream == nullptr) {
send_initial_metadata_wanted_ = true; backlog_.send_initial_metadata_wanted = true;
return; return;
} }
} }
@ -258,7 +258,7 @@ class ServerBidiReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&stream_mu_); grpc::internal::MutexLock l(&stream_mu_);
stream = stream_.load(std::memory_order_relaxed); stream = stream_.load(std::memory_order_relaxed);
if (stream == nullptr) { if (stream == nullptr) {
read_wanted_ = req; backlog_.read_wanted = req;
return; return;
} }
} }
@ -287,8 +287,8 @@ class ServerBidiReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&stream_mu_); grpc::internal::MutexLock l(&stream_mu_);
stream = stream_.load(std::memory_order_relaxed); stream = stream_.load(std::memory_order_relaxed);
if (stream == nullptr) { if (stream == nullptr) {
write_wanted_ = resp; backlog_.write_wanted = resp;
write_options_wanted_ = std::move(options); backlog_.write_options_wanted = std::move(options);
return; return;
} }
} }
@ -316,10 +316,10 @@ class ServerBidiReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&stream_mu_); grpc::internal::MutexLock l(&stream_mu_);
stream = stream_.load(std::memory_order_relaxed); stream = stream_.load(std::memory_order_relaxed);
if (stream == nullptr) { if (stream == nullptr) {
write_and_finish_wanted_ = true; backlog_.write_and_finish_wanted = true;
write_wanted_ = resp; backlog_.write_wanted = resp;
write_options_wanted_ = std::move(options); backlog_.write_options_wanted = std::move(options);
status_wanted_ = std::move(s); backlog_.status_wanted = std::move(s);
return; return;
} }
} }
@ -351,8 +351,8 @@ class ServerBidiReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&stream_mu_); grpc::internal::MutexLock l(&stream_mu_);
stream = stream_.load(std::memory_order_relaxed); stream = stream_.load(std::memory_order_relaxed);
if (stream == nullptr) { if (stream == nullptr) {
finish_wanted_ = true; backlog_.finish_wanted = true;
status_wanted_ = std::move(s); backlog_.status_wanted = std::move(s);
return; return;
} }
} }
@ -396,52 +396,51 @@ class ServerBidiReactor : public internal::ServerReactor {
// customization point. // customization point.
virtual void InternalBindStream( virtual void InternalBindStream(
ServerCallbackReaderWriter<Request, Response>* stream) { ServerCallbackReaderWriter<Request, Response>* stream) {
// TODO(vjpai): When stream_or_backlog_ becomes a variant (see below), use
// a scoped MutexLock and std::swap stream_or_backlog_ with a variant that
// has stream, then std::get<PreBindBacklog> out of that after the lock.
// Do likewise with the remaining InternalBind* functions as well.
grpc::internal::ReleasableMutexLock l(&stream_mu_); grpc::internal::ReleasableMutexLock l(&stream_mu_);
PreBindBacklog ops(std::move(backlog_));
stream_.store(stream, std::memory_order_release); stream_.store(stream, std::memory_order_release);
if (send_initial_metadata_wanted_) { l.Unlock();
if (ops.send_initial_metadata_wanted) {
stream->SendInitialMetadata(); stream->SendInitialMetadata();
send_initial_metadata_wanted_ = false;
} }
if (read_wanted_ != nullptr) { if (ops.read_wanted != nullptr) {
stream->Read(read_wanted_); stream->Read(ops.read_wanted);
read_wanted_ = nullptr;
} }
if (write_and_finish_wanted_) { if (ops.write_and_finish_wanted) {
// Don't perform actual finish actions while holding lock since it could stream->WriteAndFinish(ops.write_wanted,
// trigger OnDone that destroys this object including the still-held lock. std::move(ops.write_options_wanted),
write_and_finish_wanted_ = false; std::move(ops.status_wanted));
const Response* write_wanted = write_wanted_;
::grpc::WriteOptions write_options_wanted =
std::move(write_options_wanted_);
::grpc::Status status_wanted = std::move(status_wanted_);
l.Unlock();
stream->WriteAndFinish(write_wanted, std::move(write_options_wanted),
std::move(status_wanted));
return;
} else { } else {
if (write_wanted_ != nullptr) { if (ops.write_wanted != nullptr) {
stream->Write(write_wanted_, std::move(write_options_wanted_)); stream->Write(ops.write_wanted, std::move(ops.write_options_wanted));
write_wanted_ = nullptr;
} }
if (finish_wanted_) { if (ops.finish_wanted) {
finish_wanted_ = false; stream->Finish(std::move(ops.status_wanted));
::grpc::Status status_wanted = std::move(status_wanted_);
l.Unlock();
stream->Finish(std::move(status_wanted));
return;
} }
} }
} }
grpc::internal::Mutex stream_mu_; grpc::internal::Mutex stream_mu_;
std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_; // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
bool send_initial_metadata_wanted_ /* GUARDED_BY(stream_mu_) */ = false; // once C++17 or ABSL is supported since stream and backlog are
bool write_and_finish_wanted_ /* GUARDED_BY(stream_mu_) */ = false; // mutually exclusive in this class. Do likewise with the
bool finish_wanted_ /* GUARDED_BY(stream_mu_) */ = false; // remaining reactor classes and their backlogs as well.
Request* read_wanted_ /* GUARDED_BY(stream_mu_) */ = nullptr; std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
const Response* write_wanted_ /* GUARDED_BY(stream_mu_) */ = nullptr; struct PreBindBacklog {
::grpc::WriteOptions write_options_wanted_ /* GUARDED_BY(stream_mu_) */; bool send_initial_metadata_wanted = false;
::grpc::Status status_wanted_ /* GUARDED_BY(stream_mu_) */; bool write_and_finish_wanted = false;
bool finish_wanted = false;
Request* read_wanted = nullptr;
const Response* write_wanted = nullptr;
::grpc::WriteOptions write_options_wanted;
::grpc::Status status_wanted;
};
PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */;
}; };
/// \a ServerReadReactor is the interface for a client-streaming RPC. /// \a ServerReadReactor is the interface for a client-streaming RPC.
@ -459,7 +458,7 @@ class ServerReadReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&reader_mu_); grpc::internal::MutexLock l(&reader_mu_);
reader = reader_.load(std::memory_order_relaxed); reader = reader_.load(std::memory_order_relaxed);
if (reader == nullptr) { if (reader == nullptr) {
send_initial_metadata_wanted_ = true; backlog_.send_initial_metadata_wanted = true;
return; return;
} }
} }
@ -472,7 +471,7 @@ class ServerReadReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&reader_mu_); grpc::internal::MutexLock l(&reader_mu_);
reader = reader_.load(std::memory_order_relaxed); reader = reader_.load(std::memory_order_relaxed);
if (reader == nullptr) { if (reader == nullptr) {
read_wanted_ = req; backlog_.read_wanted = req;
return; return;
} }
} }
@ -485,8 +484,8 @@ class ServerReadReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&reader_mu_); grpc::internal::MutexLock l(&reader_mu_);
reader = reader_.load(std::memory_order_relaxed); reader = reader_.load(std::memory_order_relaxed);
if (reader == nullptr) { if (reader == nullptr) {
finish_wanted_ = true; backlog_.finish_wanted = true;
status_wanted_ = std::move(s); backlog_.status_wanted = std::move(s);
return; return;
} }
} }
@ -506,30 +505,30 @@ class ServerReadReactor : public internal::ServerReactor {
// customization point. // customization point.
virtual void InternalBindReader(ServerCallbackReader<Request>* reader) { virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
grpc::internal::ReleasableMutexLock l(&reader_mu_); grpc::internal::ReleasableMutexLock l(&reader_mu_);
PreBindBacklog ops(std::move(backlog_));
reader_.store(reader, std::memory_order_release); reader_.store(reader, std::memory_order_release);
if (send_initial_metadata_wanted_) { l.Unlock();
if (ops.send_initial_metadata_wanted) {
reader->SendInitialMetadata(); reader->SendInitialMetadata();
send_initial_metadata_wanted_ = false;
} }
if (read_wanted_ != nullptr) { if (ops.read_wanted != nullptr) {
reader->Read(read_wanted_); reader->Read(ops.read_wanted);
read_wanted_ = nullptr;
} }
if (finish_wanted_) { if (ops.finish_wanted) {
finish_wanted_ = false; reader->Finish(std::move(ops.status_wanted));
::grpc::Status status_wanted = std::move(status_wanted_);
l.Unlock();
reader->Finish(std::move(status_wanted));
return;
} }
} }
grpc::internal::Mutex reader_mu_; grpc::internal::Mutex reader_mu_;
std::atomic<ServerCallbackReader<Request>*> reader_; std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
bool send_initial_metadata_wanted_ /* GUARDED_BY(reader_mu_) */ = false; struct PreBindBacklog {
bool finish_wanted_ /* GUARDED_BY(reader_mu_) */ = false; bool send_initial_metadata_wanted = false;
Request* read_wanted_ /* GUARDED_BY(reader_mu_) */ = nullptr; bool finish_wanted = false;
::grpc::Status status_wanted_ /* GUARDED_BY(reader_mu_) */; Request* read_wanted = nullptr;
::grpc::Status status_wanted;
};
PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */;
}; };
/// \a ServerWriteReactor is the interface for a server-streaming RPC. /// \a ServerWriteReactor is the interface for a server-streaming RPC.
@ -547,7 +546,7 @@ class ServerWriteReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&writer_mu_); grpc::internal::MutexLock l(&writer_mu_);
writer = writer_.load(std::memory_order_relaxed); writer = writer_.load(std::memory_order_relaxed);
if (writer == nullptr) { if (writer == nullptr) {
send_initial_metadata_wanted_ = true; backlog_.send_initial_metadata_wanted = true;
return; return;
} }
} }
@ -563,8 +562,8 @@ class ServerWriteReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&writer_mu_); grpc::internal::MutexLock l(&writer_mu_);
writer = writer_.load(std::memory_order_relaxed); writer = writer_.load(std::memory_order_relaxed);
if (writer == nullptr) { if (writer == nullptr) {
write_wanted_ = resp; backlog_.write_wanted = resp;
write_options_wanted_ = std::move(options); backlog_.write_options_wanted = std::move(options);
return; return;
} }
} }
@ -578,10 +577,10 @@ class ServerWriteReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&writer_mu_); grpc::internal::MutexLock l(&writer_mu_);
writer = writer_.load(std::memory_order_relaxed); writer = writer_.load(std::memory_order_relaxed);
if (writer == nullptr) { if (writer == nullptr) {
write_and_finish_wanted_ = true; backlog_.write_and_finish_wanted = true;
write_wanted_ = resp; backlog_.write_wanted = resp;
write_options_wanted_ = std::move(options); backlog_.write_options_wanted = std::move(options);
status_wanted_ = std::move(s); backlog_.status_wanted = std::move(s);
return; return;
} }
} }
@ -597,8 +596,8 @@ class ServerWriteReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&writer_mu_); grpc::internal::MutexLock l(&writer_mu_);
writer = writer_.load(std::memory_order_relaxed); writer = writer_.load(std::memory_order_relaxed);
if (writer == nullptr) { if (writer == nullptr) {
finish_wanted_ = true; backlog_.finish_wanted = true;
status_wanted_ = std::move(s); backlog_.status_wanted = std::move(s);
return; return;
} }
} }
@ -617,44 +616,38 @@ class ServerWriteReactor : public internal::ServerReactor {
// customization point. // customization point.
virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) { virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
grpc::internal::ReleasableMutexLock l(&writer_mu_); grpc::internal::ReleasableMutexLock l(&writer_mu_);
PreBindBacklog ops(std::move(backlog_));
writer_.store(writer, std::memory_order_release); writer_.store(writer, std::memory_order_release);
if (send_initial_metadata_wanted_) { l.Unlock();
if (ops.send_initial_metadata_wanted) {
writer->SendInitialMetadata(); writer->SendInitialMetadata();
send_initial_metadata_wanted_ = false;
} }
if (write_and_finish_wanted_) { if (ops.write_and_finish_wanted) {
write_and_finish_wanted_ = false; writer->WriteAndFinish(ops.write_wanted,
const Response* write_wanted = write_wanted_; std::move(ops.write_options_wanted),
::grpc::WriteOptions write_options_wanted = std::move(ops.status_wanted));
std::move(write_options_wanted_);
::grpc::Status status_wanted = std::move(status_wanted_);
l.Unlock();
writer->WriteAndFinish(write_wanted, std::move(write_options_wanted),
std::move(status_wanted));
return;
} else { } else {
if (write_wanted_ != nullptr) { if (ops.write_wanted != nullptr) {
writer->Write(write_wanted_, std::move(write_options_wanted_)); writer->Write(ops.write_wanted, std::move(ops.write_options_wanted));
write_wanted_ = nullptr;
} }
if (finish_wanted_) { if (ops.finish_wanted) {
finish_wanted_ = false; writer->Finish(std::move(ops.status_wanted));
::grpc::Status status_wanted = std::move(status_wanted_);
l.Unlock();
writer->Finish(std::move(status_wanted));
return;
} }
} }
} }
grpc::internal::Mutex writer_mu_; grpc::internal::Mutex writer_mu_;
std::atomic<ServerCallbackWriter<Response>*> writer_; std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
bool send_initial_metadata_wanted_ /* GUARDED_BY(writer_mu_) */ = false; struct PreBindBacklog {
bool write_and_finish_wanted_ /* GUARDED_BY(writer_mu_) */ = false; bool send_initial_metadata_wanted = false;
bool finish_wanted_ /* GUARDED_BY(writer_mu_) */ = false; bool write_and_finish_wanted = false;
const Response* write_wanted_ /* GUARDED_BY(writer_mu_) */ = nullptr; bool finish_wanted = false;
::grpc::WriteOptions write_options_wanted_ /* GUARDED_BY(writer_mu_) */; const Response* write_wanted = nullptr;
::grpc::Status status_wanted_ /* GUARDED_BY(writer_mu_) */; ::grpc::WriteOptions write_options_wanted;
::grpc::Status status_wanted;
};
PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */;
}; };
class ServerUnaryReactor : public internal::ServerReactor { class ServerUnaryReactor : public internal::ServerReactor {
@ -669,7 +662,7 @@ class ServerUnaryReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&call_mu_); grpc::internal::MutexLock l(&call_mu_);
call = call_.load(std::memory_order_relaxed); call = call_.load(std::memory_order_relaxed);
if (call == nullptr) { if (call == nullptr) {
send_initial_metadata_wanted_ = true; backlog_.send_initial_metadata_wanted = true;
return; return;
} }
} }
@ -681,8 +674,8 @@ class ServerUnaryReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&call_mu_); grpc::internal::MutexLock l(&call_mu_);
call = call_.load(std::memory_order_relaxed); call = call_.load(std::memory_order_relaxed);
if (call == nullptr) { if (call == nullptr) {
finish_wanted_ = true; backlog_.finish_wanted = true;
status_wanted_ = std::move(s); backlog_.status_wanted = std::move(s);
return; return;
} }
} }
@ -700,25 +693,26 @@ class ServerUnaryReactor : public internal::ServerReactor {
// customization point. // customization point.
virtual void InternalBindCall(ServerCallbackUnary* call) { virtual void InternalBindCall(ServerCallbackUnary* call) {
grpc::internal::ReleasableMutexLock l(&call_mu_); grpc::internal::ReleasableMutexLock l(&call_mu_);
PreBindBacklog ops(std::move(backlog_));
call_.store(call, std::memory_order_release); call_.store(call, std::memory_order_release);
if (send_initial_metadata_wanted_) { l.Unlock();
if (ops.send_initial_metadata_wanted) {
call->SendInitialMetadata(); call->SendInitialMetadata();
send_initial_metadata_wanted_ = false;
} }
if (finish_wanted_) { if (ops.finish_wanted) {
finish_wanted_ = false; call->Finish(std::move(ops.status_wanted));
::grpc::Status status_wanted = std::move(status_wanted_);
l.Unlock();
call->Finish(std::move(status_wanted));
return;
} }
} }
grpc::internal::Mutex call_mu_; grpc::internal::Mutex call_mu_;
std::atomic<ServerCallbackUnary*> call_; std::atomic<ServerCallbackUnary*> call_{nullptr};
bool send_initial_metadata_wanted_ /* GUARDED_BY(writer_mu_) */ = false; struct PreBindBacklog {
bool finish_wanted_ /* GUARDED_BY(writer_mu_) */ = false; bool send_initial_metadata_wanted = false;
::grpc::Status status_wanted_ /* GUARDED_BY(writer_mu_) */; bool finish_wanted = false;
::grpc::Status status_wanted;
};
PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */;
}; };
namespace internal { namespace internal {

Loading…
Cancel
Save