merge init_ops, writes_done_ops and write_ops

pull/9964/head
Yuxuan Li 8 years ago
parent 0aac9d06e2
commit b39eeac79e
  1. 118
      include/grpc++/impl/codegen/async_stream.h

@ -222,10 +222,10 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
write_ops_.SendInitialMetadata(context->send_initial_metadata_, write_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags()); context->initial_metadata_flags());
} else { } else {
init_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_, write_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags()); context->initial_metadata_flags());
call_.PerformOps(&init_ops_); call_.PerformOps(&write_ops_);
} }
} }
@ -256,9 +256,9 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
} }
void WritesDone(void* tag) override { void WritesDone(void* tag) override {
writes_done_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
writes_done_ops_.ClientSendClose(); write_ops_.ClientSendClose();
call_.PerformOps(&writes_done_ops_); call_.PerformOps(&write_ops_);
} }
void Finish(Status* status, void* tag) override { void Finish(Status* status, void* tag) override {
@ -273,11 +273,9 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
private: private:
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
write_ops_; write_ops_;
CallOpSet<CallOpClientSendClose> writes_done_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
CallOpClientRecvStatus> CallOpClientRecvStatus>
finish_ops_; finish_ops_;
@ -310,10 +308,10 @@ class ClientAsyncReaderWriter final
write_ops_.SendInitialMetadata(context->send_initial_metadata_, write_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags()); context->initial_metadata_flags());
} else { } else {
init_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_, write_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags()); context->initial_metadata_flags());
call_.PerformOps(&init_ops_); call_.PerformOps(&write_ops_);
} }
} }
@ -370,7 +368,6 @@ class ClientAsyncReaderWriter final
private: private:
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
@ -454,9 +451,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> {
ServerContext* ctx_; ServerContext* ctx_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_; CallOpSet<CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
CallOpServerSendStatus>
finish_ops_;
}; };
template <class W> template <class W>
@ -499,16 +494,20 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
call_.PerformOps(&meta_ops_); call_.PerformOps(&meta_ops_);
} }
void Write(const W& msg, void* tag) override { void EnsureInitialMetadataSent(CallOpSetInterface* ops) {
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_, ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags()); ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) { if (ctx_->compression_level_set()) {
write_ops_.set_compression_level(ctx_->compression_level()); ops.set_compression_level(ctx_->compression_level());
} }
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
} }
}
void Write(const W& msg, void* tag) override {
write_ops_.set_output_tag(tag);
EnsureInitialMetadataSent(&write_ops_);
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
@ -520,14 +519,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
options.set_buffer_hint(); options.set_buffer_hint();
} }
if (!ctx_->sent_initial_metadata_) { EnsureInitialMetadataSent(&write_ops_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
write_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
@ -536,14 +528,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
void* tag) override { void* tag) override {
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { EnsureInitialMetadataSent(&write_ops_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
write_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
options.set_buffer_hint(); options.set_buffer_hint();
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@ -552,14 +537,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
void Finish(const Status& status, void* tag) override { void Finish(const Status& status, void* tag) override {
finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { EnsureInitialMetadataSent(&finish_ops_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
finish_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_); call_.PerformOps(&finish_ops_);
} }
@ -619,6 +597,17 @@ class ServerAsyncReaderWriter final
call_.PerformOps(&meta_ops_); call_.PerformOps(&meta_ops_);
} }
void EnsureInitialMetadataSent(CallOpSetInterface* ops) {
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
ops.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
}
void Read(R* msg, void* tag) override { void Read(R* msg, void* tag) override {
read_ops_.set_output_tag(tag); read_ops_.set_output_tag(tag);
read_ops_.RecvMessage(msg); read_ops_.RecvMessage(msg);
@ -627,14 +616,7 @@ class ServerAsyncReaderWriter final
void Write(const W& msg, void* tag) override { void Write(const W& msg, void* tag) override {
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { EnsureInitialMetadataSent(&write_ops_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
write_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
@ -645,14 +627,7 @@ class ServerAsyncReaderWriter final
if (options.is_last_message()) { if (options.is_last_message()) {
options.set_buffer_hint(); options.set_buffer_hint();
} }
if (!ctx_->sent_initial_metadata_) { EnsureInitialMetadataSent(&write_ops_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
write_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
} }
@ -660,14 +635,7 @@ class ServerAsyncReaderWriter final
void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
void* tag) override { void* tag) override {
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { EnsureInitialMetadataSent(&write_ops_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
write_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
options.set_buffer_hint(); options.set_buffer_hint();
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@ -676,14 +644,8 @@ class ServerAsyncReaderWriter final
void Finish(const Status& status, void* tag) override { void Finish(const Status& status, void* tag) override {
finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { EnsureInitialMetadataSent(&finish_ops_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
finish_ops_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_); call_.PerformOps(&finish_ops_);
} }

Loading…
Cancel
Save