From 549a74daa87c871b7bf47d11f6f0539f3235b631 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 9 Feb 2015 22:13:44 -0800 Subject: [PATCH] Rephrase async streaming methods To ensure that the CallOpBuffers stay alive until completion. --- include/grpc++/call.h | 2 ++ include/grpc++/stream.h | 78 +++++++++++++++++++++++------------------ 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/include/grpc++/call.h b/include/grpc++/call.h index 94215bfa986..de789febe6e 100644 --- a/include/grpc++/call.h +++ b/include/grpc++/call.h @@ -57,6 +57,8 @@ class CallOpBuffer final : public CompletionQueueTag { public: CallOpBuffer() : return_tag_(this) {} + void Reset(void *next_return_tag); + void AddSendInitialMetadata(std::vector > *metadata); void AddSendMessage(const google::protobuf::Message &message); void AddRecvMessage(google::protobuf::Message *message); diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index ca32d60810d..631183ea55c 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -330,27 +330,30 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface, ClientContext *context, const google::protobuf::Message &request, void* tag) : call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendMessage(request); - buf.AddClientSendClose(); - call_.PerformOps(&buf); + init_buf_.Reset(tag); + init_buf_.AddSendMessage(request); + init_buf_.AddClientSendClose(); + call_.PerformOps(&init_buf_); } virtual void Read(R *msg, void* tag) override { - CallOpBuffer buf; - buf.AddRecvMessage(msg); - call_.PerformOps(&buf); + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); } virtual void Finish(Status* status, void* tag) override { - CallOpBuffer buf; - buf.AddClientRecvStatus(status); - call_.PerformOps(&buf); + finish_buf_.Reset(tag); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); } private: CompletionQueue cq_; Call call_; + CallOpBuffer init_buf_; + CallOpBuffer read_buf_; + CallOpBuffer finish_buf_; }; template @@ -365,28 +368,31 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface, call_(channel->CreateCall(method, context, &cq_)) {} virtual void Write(const W& msg, void* tag) override { - CallOpBuffer buf; - buf.AddSendMessage(msg); - call_.PerformOps(&buf); + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); } - virtual void WritesDone(void* tag) { - CallOpBuffer buf; - buf.AddClientSendClose(); - call_.PerformOps(&buf); + virtual void WritesDone(void* tag) override { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); } virtual void Finish(Status* status, void* tag) override { - CallOpBuffer buf; - buf.AddRecvMessage(response_); - buf.AddClientRecvStatus(status); - call_.PerformOps(&buf); + finish_buf_.Reset(tag); + finish_buf_.AddRecvMessage(response_); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); } private: google::protobuf::Message *const response_; CompletionQueue cq_; Call call_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; }; // Client-side interface for bi-directional streaming. @@ -400,32 +406,36 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, : call_(channel->CreateCall(method, context, &cq_)) {} virtual void Read(R *msg, void* tag) override { - CallOpBuffer buf; - buf.AddRecvMessage(msg); - call_.PerformOps(&buf); + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); } virtual void Write(const W& msg, void* tag) override { - CallOpBuffer buf; - buf.AddSendMessage(msg); - call_.PerformOps(&buf); + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); } - virtual void WritesDone(void* tag) { - CallOpBuffer buf; - buf.AddClientSendClose(); - call_.PerformOps(&buf); + virtual void WritesDone(void* tag) override { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); } virtual void Finish(Status* status, void* tag) override { - CallOpBuffer buf; - buf.AddClientRecvStatus(status); - call_.PerformOps(&buf); + finish_buf_.Reset(tag); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); } private: CompletionQueue cq_; Call call_; + CallOpBuffer read_buf_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; }; // TODO(yangg) Move out of stream.h