diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 6cc3716291c..e976e118147 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -72,7 +72,7 @@ class ServerContext { template friend class ::grpc::ServerReader; template friend class ::grpc::ServerWriter; template friend class ::grpc::ServerReaderWriter; - + ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count); const std::chrono::system_clock::time_point deadline_; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 52a764bfc4e..6dc05bc9a6b 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -370,6 +370,15 @@ class ClientAsyncStreamingInterface { virtual void Finish(Status* status, void* tag) = 0; }; +class ServerAsyncStreamingInterface { + public: + virtual ~ServerAsyncStreamingInterface() {} + + virtual void SendInitialMetadata(void* tag) = 0; + + virtual void Finish(const Status& status, void* tag) = 0; +}; + // An interface that yields a sequence of R messages. template class AsyncReaderInterface { @@ -577,6 +586,7 @@ class ServerAsyncResponseWriter final { virtual void Write(const W& msg, void* tag) override { CallOpBuffer buf; + buf.Reset(tag); buf.AddSendMessage(msg); call_->PerformOps(&buf); } @@ -586,48 +596,147 @@ class ServerAsyncResponseWriter final { }; template -class ServerAsyncReader : public AsyncReaderInterface { +class ServerAsyncReader : public ServerAsyncStreamingInterface, + public AsyncReaderInterface { public: - explicit ServerAsyncReader(Call* call) : call_(call) {} + ServerAsyncReader(Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); - virtual void Read(R* msg, void* tag) { - // TODO + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&meta_buf_); + } + + void Read(R* msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_->PerformOps(&read_buf_); + } + + void Finish(const Status& status, void* tag) override { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_->PerformOps(&finish_buf_); } + private: Call* call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer read_buf_; + CallOpBuffer finish_buf_; }; template -class ServerAsyncWriter : public AsyncWriterInterface { +class ServerAsyncWriter : public ServerAsyncStreamingInterface, + public AsyncWriterInterface { public: - explicit ServerAsyncWriter(Call* call) : call_(call) {} + ServerAsyncWriter(Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&meta_buf_); + } + + void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + write_buf_.AddSendMessage(msg); + call_->PerformOps(&write_buf_); + } - virtual void Write(const W& msg, void* tag) { - // TODO + void Finish(const Status& status, void* tag) override { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_->PerformOps(&finish_buf_); } private: Call* call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer write_buf_; + CallOpBuffer finish_buf_; }; // Server-side interface for bi-directional streaming. template -class ServerAsyncReaderWriter : public AsyncWriterInterface, - public AsyncReaderInterface { +class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface, + public AsyncWriterInterface, + public AsyncReaderInterface { public: - explicit ServerAsyncReaderWriter(Call* call) : call_(call) {} + ServerAsyncReaderWriter(Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&meta_buf_); + } + + virtual void Read(R* msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_->PerformOps(&read_buf_); + } - virtual void Read(R* msg, void* tag) { - // TODO + virtual void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + write_buf_.AddSendMessage(msg); + call_->PerformOps(&write_buf_); } - virtual void Write(const W& msg, void* tag) { - // TODO + void Finish(const Status& status, void* tag) override { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_->PerformOps(&finish_buf_); } private: Call* call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer read_buf_; + CallOpBuffer write_buf_; + CallOpBuffer finish_buf_; }; } // namespace grpc