From 424bc92e377ad0f2aed23bb4bcde6bb06aa49774 Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Thu, 12 Feb 2015 10:24:39 -0800 Subject: [PATCH 1/2] implement ClientAsyncX api --- include/grpc++/stream.h | 105 ++++++++++++++++++++++++++++++++-------- 1 file changed, 85 insertions(+), 20 deletions(-) diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 74e7539aa47..52a764bfc4e 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -365,6 +365,8 @@ class ClientAsyncStreamingInterface { public: virtual ~ClientAsyncStreamingInterface() {} + virtual void ReadInitialMetadata(void* tag) = 0; + virtual void Finish(Status* status, void* tag) = 0; }; @@ -390,30 +392,50 @@ template class ClientAsyncReader final : public ClientAsyncStreamingInterface, public AsyncReaderInterface { public: - // Blocking create a stream and write the first request out. + // Create a stream and write the first request out. ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const google::protobuf::Message &request, void* tag) - : call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { init_buf_.Reset(tag); + init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); init_buf_.AddSendMessage(request); init_buf_.AddClientSendClose(); call_.PerformOps(&init_buf_); } - virtual void Read(R *msg, void* tag) override { + void ReadInitialMetadata(void* tag) override { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpBuffer buf; + buf.Reset(tag); + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + context_->initial_metadata_received_ = true; + } + + void Read(R *msg, void* tag) override { read_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } read_buf_.AddRecvMessage(msg); call_.PerformOps(&read_buf_); } - virtual void Finish(Status* status, void* tag) override { + void Finish(Status* status, void* tag) override { finish_buf_.Reset(tag); - finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata + if (!context_->initial_metadata_received_) { + finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } private: + ClientContext* context_ = nullptr; CompletionQueue cq_; Call call_; CallOpBuffer init_buf_; @@ -425,37 +447,56 @@ template class ClientAsyncWriter final : public ClientAsyncStreamingInterface, public WriterInterface { public: - // Blocking create a stream. ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, - google::protobuf::Message *response) - : response_(response), - call_(channel->CreateCall(method, context, &cq_)) {} + google::protobuf::Message *response, void* tag) + : context_(context), response_(response), + call_(channel->CreateCall(method, context, &cq_)) { + init_buf_.Reset(tag); + init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&init_buf_); + } - virtual void Write(const W& msg, void* tag) override { + void ReadInitialMetadata(void* tag) override { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpBuffer buf; + buf.Reset(tag); + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + context_->initial_metadata_received_ = true; + } + + void Write(const W& msg, void* tag) override { write_buf_.Reset(tag); write_buf_.AddSendMessage(msg); call_.PerformOps(&write_buf_); } - virtual void WritesDone(void* tag) override { + void WritesDone(void* tag) override { writes_done_buf_.Reset(tag); writes_done_buf_.AddClientSendClose(); call_.PerformOps(&writes_done_buf_); } - virtual void Finish(Status* status, void* tag) override { + void Finish(Status* status, void* tag) override { finish_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } finish_buf_.AddRecvMessage(response_, &got_message_); - finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata + finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } private: + ClientContext* context_ = nullptr; google::protobuf::Message *const response_; bool got_message_; CompletionQueue cq_; Call call_; + CallOpBuffer init_buf_; CallOpBuffer write_buf_; CallOpBuffer writes_done_buf_; CallOpBuffer finish_buf_; @@ -468,36 +509,60 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, public AsyncReaderInterface { public: ClientAsyncReaderWriter(ChannelInterface *channel, - const RpcMethod &method, ClientContext *context) - : call_(channel->CreateCall(method, context, &cq_)) {} + const RpcMethod &method, ClientContext *context, void* tag) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + init_buf_.Reset(tag); + init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&init_buf_); + } + + void ReadInitialMetadata(void* tag) override { + GPR_ASSERT(!context_->initial_metadata_received_); - virtual void Read(R *msg, void* tag) override { + CallOpBuffer buf; + buf.Reset(tag); + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + context_->initial_metadata_received_ = true; + } + + void Read(R *msg, void* tag) override { read_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } read_buf_.AddRecvMessage(msg); call_.PerformOps(&read_buf_); } - virtual void Write(const W& msg, void* tag) override { + void Write(const W& msg, void* tag) override { write_buf_.Reset(tag); write_buf_.AddSendMessage(msg); call_.PerformOps(&write_buf_); } - virtual void WritesDone(void* tag) override { + void WritesDone(void* tag) override { writes_done_buf_.Reset(tag); writes_done_buf_.AddClientSendClose(); call_.PerformOps(&writes_done_buf_); } - virtual void Finish(Status* status, void* tag) override { + void Finish(Status* status, void* tag) override { finish_buf_.Reset(tag); - finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata + if (!context_->initial_metadata_received_) { + finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } private: + ClientContext* context_ = nullptr; CompletionQueue cq_; Call call_; + CallOpBuffer init_buf_; CallOpBuffer read_buf_; CallOpBuffer write_buf_; CallOpBuffer writes_done_buf_; From a38feb9be756ef2fe06dc192b1d74bd015a384ee Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Thu, 12 Feb 2015 12:05:20 -0800 Subject: [PATCH 2/2] Implement async streaming APIs --- include/grpc++/server_context.h | 2 +- include/grpc++/stream.h | 139 ++++++++++++++++++++++++++++---- 2 files changed, 125 insertions(+), 16 deletions(-) diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 6cc3716291c..e976e118147 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -72,7 +72,7 @@ class ServerContext { template friend class ::grpc::ServerReader; template friend class ::grpc::ServerWriter; template friend class ::grpc::ServerReaderWriter; - + ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count); const std::chrono::system_clock::time_point deadline_; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 52a764bfc4e..6dc05bc9a6b 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -370,6 +370,15 @@ class ClientAsyncStreamingInterface { virtual void Finish(Status* status, void* tag) = 0; }; +class ServerAsyncStreamingInterface { + public: + virtual ~ServerAsyncStreamingInterface() {} + + virtual void SendInitialMetadata(void* tag) = 0; + + virtual void Finish(const Status& status, void* tag) = 0; +}; + // An interface that yields a sequence of R messages. template class AsyncReaderInterface { @@ -577,6 +586,7 @@ class ServerAsyncResponseWriter final { virtual void Write(const W& msg, void* tag) override { CallOpBuffer buf; + buf.Reset(tag); buf.AddSendMessage(msg); call_->PerformOps(&buf); } @@ -586,48 +596,147 @@ class ServerAsyncResponseWriter final { }; template -class ServerAsyncReader : public AsyncReaderInterface { +class ServerAsyncReader : public ServerAsyncStreamingInterface, + public AsyncReaderInterface { public: - explicit ServerAsyncReader(Call* call) : call_(call) {} + ServerAsyncReader(Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); - virtual void Read(R* msg, void* tag) { - // TODO + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&meta_buf_); + } + + void Read(R* msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_->PerformOps(&read_buf_); + } + + void Finish(const Status& status, void* tag) override { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_->PerformOps(&finish_buf_); } + private: Call* call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer read_buf_; + CallOpBuffer finish_buf_; }; template -class ServerAsyncWriter : public AsyncWriterInterface { +class ServerAsyncWriter : public ServerAsyncStreamingInterface, + public AsyncWriterInterface { public: - explicit ServerAsyncWriter(Call* call) : call_(call) {} + ServerAsyncWriter(Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&meta_buf_); + } + + void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + write_buf_.AddSendMessage(msg); + call_->PerformOps(&write_buf_); + } - virtual void Write(const W& msg, void* tag) { - // TODO + void Finish(const Status& status, void* tag) override { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_->PerformOps(&finish_buf_); } private: Call* call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer write_buf_; + CallOpBuffer finish_buf_; }; // Server-side interface for bi-directional streaming. template -class ServerAsyncReaderWriter : public AsyncWriterInterface, - public AsyncReaderInterface { +class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface, + public AsyncWriterInterface, + public AsyncReaderInterface { public: - explicit ServerAsyncReaderWriter(Call* call) : call_(call) {} + ServerAsyncReaderWriter(Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) override { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&meta_buf_); + } + + virtual void Read(R* msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_->PerformOps(&read_buf_); + } - virtual void Read(R* msg, void* tag) { - // TODO + virtual void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + write_buf_.AddSendMessage(msg); + call_->PerformOps(&write_buf_); } - virtual void Write(const W& msg, void* tag) { - // TODO + void Finish(const Status& status, void* tag) override { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_->PerformOps(&finish_buf_); } private: Call* call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer read_buf_; + CallOpBuffer write_buf_; + CallOpBuffer finish_buf_; }; } // namespace grpc