|
|
|
@ -589,27 +589,27 @@ class ServerWriter final : public ServerWriterInterface<W> { |
|
|
|
|
if (options.is_last_message()) { |
|
|
|
|
options.set_buffer_hint(); |
|
|
|
|
} |
|
|
|
|
if (!ctx_->hanging_ops_.SendMessage(msg, options).ok()) { |
|
|
|
|
if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
ctx_->hanging_ops_.SendInitialMetadata(ctx_->initial_metadata_, |
|
|
|
|
ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_, |
|
|
|
|
ctx_->initial_metadata_flags()); |
|
|
|
|
if (ctx_->compression_level_set()) { |
|
|
|
|
ctx_->hanging_ops_.set_compression_level(ctx_->compression_level()); |
|
|
|
|
ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); |
|
|
|
|
} |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
} |
|
|
|
|
call_->PerformOps(&ctx_->hanging_ops_); |
|
|
|
|
call_->PerformOps(&ctx_->pending_ops_); |
|
|
|
|
// if this is the last message we defer the pluck until AFTER we start
|
|
|
|
|
// the trailing md op. This prevents hangs. See
|
|
|
|
|
// https://github.com/grpc/grpc/issues/11546
|
|
|
|
|
if (options.is_last_message()) { |
|
|
|
|
ctx_->has_hanging_ops_ = true; |
|
|
|
|
ctx_->has_pending_ops_ = true; |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
ctx_->has_hanging_ops_ = false; |
|
|
|
|
return call_->cq()->Pluck(&ctx_->hanging_ops_); |
|
|
|
|
ctx_->has_pending_ops_ = false; |
|
|
|
|
return call_->cq()->Pluck(&ctx_->pending_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -661,26 +661,27 @@ class ServerReaderWriterBody final { |
|
|
|
|
if (options.is_last_message()) { |
|
|
|
|
options.set_buffer_hint(); |
|
|
|
|
} |
|
|
|
|
if (!ctx_->hanging_ops_.SendMessage(msg, options).ok()) { |
|
|
|
|
if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
ctx_->hanging_ops_.SendInitialMetadata(ctx_->initial_metadata_, |
|
|
|
|
ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_, |
|
|
|
|
ctx_->initial_metadata_flags()); |
|
|
|
|
if (ctx_->compression_level_set()) { |
|
|
|
|
ctx_->hanging_ops_.set_compression_level(ctx_->compression_level()); |
|
|
|
|
ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); |
|
|
|
|
} |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
} |
|
|
|
|
call_->PerformOps(&ctx_->hanging_ops_); |
|
|
|
|
call_->PerformOps(&ctx_->pending_ops_); |
|
|
|
|
// if this is the last message we defer the pluck until AFTER we start
|
|
|
|
|
// the trailing md op. This prevents hangs. See
|
|
|
|
|
// https://github.com/grpc/grpc/issues/11546
|
|
|
|
|
if (options.is_last_message()) { |
|
|
|
|
ctx_->has_hanging_ops_ = true; |
|
|
|
|
ctx_->has_pending_ops_ = true; |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
return call_->cq()->Pluck(&ctx_->hanging_ops_); |
|
|
|
|
ctx_->has_pending_ops_ = false; |
|
|
|
|
return call_->cq()->Pluck(&ctx_->pending_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|