Fix server backlog processing to not set stream until backlog cleared

pull/24177/head
Vijay Pai 4 years ago
parent 83c3fe5d7e
commit b427807f3b
  1. 90
      include/grpcpp/impl/codegen/server_callback.h

@ -436,33 +436,29 @@ class ServerBidiReactor : public internal::ServerReactor {
// customization point.
virtual void InternalBindStream(
ServerCallbackReaderWriter<Request, Response>* stream) {
// TODO(vjpai): When stream_or_backlog_ becomes a variant (see below), use
// a scoped MutexLock and std::swap stream_or_backlog_ with a variant that
// has stream, then std::get<PreBindBacklog> out of that after the lock.
// Do likewise with the remaining InternalBind* functions as well.
grpc::internal::ReleasableMutexLock l(&stream_mu_);
PreBindBacklog ops(std::move(backlog_));
stream_.store(stream, std::memory_order_release);
l.Unlock();
grpc::internal::MutexLock l(&stream_mu_);
if (ops.send_initial_metadata_wanted) {
if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
stream->SendInitialMetadata();
}
if (ops.read_wanted != nullptr) {
stream->Read(ops.read_wanted);
if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
stream->Read(backlog_.read_wanted);
}
if (ops.write_and_finish_wanted) {
stream->WriteAndFinish(ops.write_wanted,
std::move(ops.write_options_wanted),
std::move(ops.status_wanted));
if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
stream->WriteAndFinish(backlog_.write_wanted,
std::move(backlog_.write_options_wanted),
std::move(backlog_.status_wanted));
} else {
if (ops.write_wanted != nullptr) {
stream->Write(ops.write_wanted, std::move(ops.write_options_wanted));
if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
stream->Write(backlog_.write_wanted,
std::move(backlog_.write_options_wanted));
}
if (ops.finish_wanted) {
stream->Finish(std::move(ops.status_wanted));
if (GPR_UNLIKELY(backlog_.finish_wanted)) {
stream->Finish(std::move(backlog_.status_wanted));
}
}
// Set stream_ last so that other functions can use it lock-free
stream_.store(stream, std::memory_order_release);
}
grpc::internal::Mutex stream_mu_;
@ -544,20 +540,19 @@ class ServerReadReactor : public internal::ServerReactor {
// May be overridden by internal implementation details. This is not a public
// customization point.
virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
grpc::internal::ReleasableMutexLock l(&reader_mu_);
PreBindBacklog ops(std::move(backlog_));
reader_.store(reader, std::memory_order_release);
l.Unlock();
grpc::internal::MutexLock l(&reader_mu_);
if (ops.send_initial_metadata_wanted) {
if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
reader->SendInitialMetadata();
}
if (ops.read_wanted != nullptr) {
reader->Read(ops.read_wanted);
if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
reader->Read(backlog_.read_wanted);
}
if (ops.finish_wanted) {
reader->Finish(std::move(ops.status_wanted));
if (GPR_UNLIKELY(backlog_.finish_wanted)) {
reader->Finish(std::move(backlog_.status_wanted));
}
// Set reader_ last so that other functions can use it lock-free
reader_.store(reader, std::memory_order_release);
}
grpc::internal::Mutex reader_mu_;
@ -655,26 +650,26 @@ class ServerWriteReactor : public internal::ServerReactor {
// May be overridden by internal implementation details. This is not a public
// customization point.
virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
grpc::internal::ReleasableMutexLock l(&writer_mu_);
PreBindBacklog ops(std::move(backlog_));
writer_.store(writer, std::memory_order_release);
l.Unlock();
grpc::internal::MutexLock l(&writer_mu_);
if (ops.send_initial_metadata_wanted) {
if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
writer->SendInitialMetadata();
}
if (ops.write_and_finish_wanted) {
writer->WriteAndFinish(ops.write_wanted,
std::move(ops.write_options_wanted),
std::move(ops.status_wanted));
if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
writer->WriteAndFinish(backlog_.write_wanted,
std::move(backlog_.write_options_wanted),
std::move(backlog_.status_wanted));
} else {
if (ops.write_wanted != nullptr) {
writer->Write(ops.write_wanted, std::move(ops.write_options_wanted));
if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
writer->Write(backlog_.write_wanted,
std::move(backlog_.write_options_wanted));
}
if (ops.finish_wanted) {
writer->Finish(std::move(ops.status_wanted));
if (GPR_UNLIKELY(backlog_.finish_wanted)) {
writer->Finish(std::move(backlog_.status_wanted));
}
}
// Set writer_ last so that other functions can use it lock-free
writer_.store(writer, std::memory_order_release);
}
grpc::internal::Mutex writer_mu_;
@ -735,17 +730,16 @@ class ServerUnaryReactor : public internal::ServerReactor {
// May be overridden by internal implementation details. This is not a public
// customization point.
virtual void InternalBindCall(ServerCallbackUnary* call) {
grpc::internal::ReleasableMutexLock l(&call_mu_);
PreBindBacklog ops(std::move(backlog_));
call_.store(call, std::memory_order_release);
l.Unlock();
grpc::internal::MutexLock l(&call_mu_);
if (ops.send_initial_metadata_wanted) {
if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
call->SendInitialMetadata();
}
if (ops.finish_wanted) {
call->Finish(std::move(ops.status_wanted));
if (GPR_UNLIKELY(backlog_.finish_wanted)) {
call->Finish(std::move(backlog_.status_wanted));
}
// Set call_ last so that other functions can use it lock-free
call_.store(call, std::memory_order_release);
}
grpc::internal::Mutex call_mu_;

Loading…
Cancel
Save