From 75ec2b191cbd94ff0eb3f1247d66e3991aa97000 Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Mon, 9 Feb 2015 16:03:41 -0800 Subject: [PATCH] more implementation and all async signatures --- include/grpc++/stream.h | 241 +++++++++++++++++++++++++++++++--------- 1 file changed, 186 insertions(+), 55 deletions(-) diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 4d4581d00f0..cd416f853b3 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -146,6 +146,7 @@ class ClientWriter final : public ClientStreamingInterface, virtual Status Finish() override { CallOpBuffer buf; Status status; + buf.AddRecvMessage(response_); buf.AddClientRecvStatus(&status); call_.PerformOps(&buf, (void *)4); GPR_ASSERT(cq_.Pluck((void *)4)); @@ -207,125 +208,255 @@ class ClientReaderWriter final : public ClientStreamingInterface, template class ServerReader final : public ReaderInterface { public: - ServerReader(CompletionQueue* cq, Call* call) : cq_(cq), call_(call) {} + explicit ServerReader(Call* call) : call_(call) {} virtual bool Read(R* msg) override { CallOpBuffer buf; buf.AddRecvMessage(msg); call_->PerformOps(&buf, (void *)2); - return cq_->Pluck((void *)2); + return call_->cq()->Pluck((void *)2); } private: - CompletionQueue* cq_; Call* call_; }; template -class ServerWriter : public WriterInterface { +class ServerWriter final : public WriterInterface { public: - explicit ServerWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Read(context_->request()); - } + explicit ServerWriter(Call* call) : call_(call) {} - virtual bool Write(const W& msg) { - return context_->Write(const_cast(&msg), false); + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf, (void *)2); + return call_->cq()->Pluck((void *)2); } private: - StreamContextInterface* const context_; // not owned + Call* call_; }; // Server-side interface for bi-directional streaming. template -class ServerReaderWriter : public WriterInterface, +class ServerReaderWriter final : public WriterInterface, public ReaderInterface { public: - explicit ServerReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); + explicit ServerReaderWriter(Call* call) : call_(call) {} + + virtual bool Read(R* msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_->PerformOps(&buf, (void *)2); + return call_->cq()->Pluck((void *)2); + } + + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf, (void *)3); + return call_->cq()->Pluck((void *)3); } - virtual bool Read(R* msg) { return context_->Read(msg); } + private: + CompletionQueue* cq_; + Call* call_; +}; + +// Async interfaces +// Common interface for all client side streaming. +class ClientAsyncStreamingInterface { + public: + virtual ~ClientAsyncStreamingInterface() {} + + virtual void Finish(Status* status, void* tag) = 0; +}; + +// An interface that yields a sequence of R messages. +template +class AsyncReaderInterface { + public: + virtual ~AsyncReaderInterface() {} - virtual bool Write(const W& msg) { - return context_->Write(const_cast(&msg), false); + virtual void Read(R* msg, void* tag) = 0; +}; + +// An interface that can be fed a sequence of W messages. +template +class AsyncWriterInterface { + public: + virtual ~Async WriterInterface() {} + + virtual void Write(const W& msg, void* tag) = 0; +}; + +template +class ClientAsyncReader final : public ClientAsyncStreamingInterface, + public AsyncReaderInterface { + public: + // Blocking create a stream and write the first request out. + ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, void* tag) + : call_(channel->CreateCall(method, context, &cq_)) { + CallOpBuffer buf; + buf.AddSendMessage(request); + buf.AddClientSendClose(); + call_.PerformOps(&buf, tag); + } + + virtual void Read(R *msg, void* tag) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(&buf, tag); + } + + virtual void Finish(Status* status, void* tag) override { + CallOpBuffer buf; + buf.AddClientRecvStatus(status); + call_.PerformOps(&buf, tag); } private: - StreamContextInterface* const context_; // not owned + CompletionQueue cq_; + Call call_; }; template -class ServerAsyncResponseWriter { +class ClientWriter final : public ClientAsyncStreamingInterface, + public WriterInterface { public: - explicit ServerAsyncResponseWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Read(context_->request()); + // Blocking create a stream. + ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + google::protobuf::Message *response) + : response_(response), + call_(channel->CreateCall(method, context, &cq_)) {} + + virtual void Write(const W& msg, void* tag) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf, tag); } - virtual bool Write(const W& msg) { - return context_->Write(const_cast(&msg), false); + virtual void WritesDone(void* tag) { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf, tag); + } + + virtual void Finish(Status* status, void* tag) override { + CallOpBuffer buf; + buf.AddRecvMessage(response_); + buf.AddClientRecvStatus(status); + call_.PerformOps(&buf, tag); } private: - StreamContextInterface* const context_; // not owned + google::protobuf::Message *const response_; + CompletionQueue cq_; + Call call_; }; -template -class ServerAsyncReader : public ReaderInterface { +// Client-side interface for bi-directional streaming. +template +class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, + public AsyncWriterInterface, + public AsyncReaderInterface { public: - explicit ServerAsyncReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); + ClientAsyncReaderWriter(ChannelInterface *channel, + const RpcMethod &method, ClientContext *context) + : call_(channel->CreateCall(method, context, &cq_)) {} + + virtual void Read(R *msg, void* tag) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(&buf, tag); + } + + virtual void Write(const W& msg, void* tag) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf, tag); + } + + virtual bool WritesDone(void* tag) { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf, tag); } - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void Finish(Status* status, void* tag) override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(status); + call_.PerformOps(&buf, tag); + } private: - StreamContextInterface* const context_; // not owned + CompletionQueue cq_; + Call call_; }; +// TODO(yangg) Move out of stream.h template -class ServerAsyncWriter : public WriterInterface { +class ServerAsyncResponseWriter final { public: - explicit ServerAsyncWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Read(context_->request()); + explicit ServerAsyncResponseWriter(Call* call) : call_(call) {} + + virtual void Write(const W& msg, void* tag) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf, tag); } - virtual bool Write(const W& msg) { - return context_->Write(const_cast(&msg), false); + private: + Call* call_; +}; + +template +class ServerAsyncReader : public AsyncReaderInterface { + public: + explicit ServerAsyncReader(Call* call) : call_(call) {} + + virtual void Read(R* msg, void* tag) { + // TODO } private: - StreamContextInterface* const context_; // not owned + Call* call_; +}; + +template +class ServerAsyncWriter : public AsyncWriterInterface { + public: + explicit ServerAsyncWriter(Call* call) : call_(call) {} + + virtual void Write(const W& msg, void* tag) { + // TODO + } + + private: + Call* call_; }; // Server-side interface for bi-directional streaming. template -class ServerAsyncReaderWriter : public WriterInterface, - public ReaderInterface { +class ServerAsyncReaderWriter : public AsyncWriterInterface, + public AsyncReaderInterface { public: - explicit ServerAsyncReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - } + explicit ServerAsyncReaderWriter(Call* call) : call_(call) {} - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void Read(R* msg, void* tag) { + // TODO + } - virtual bool Write(const W& msg) { - return context_->Write(const_cast(&msg), false); + virtual void Write(const W& msg, void* tag) { + // TODO } private: - StreamContextInterface* const context_; // not owned + Call* call_; }; } // namespace grpc