Rephrase async streaming methods

To ensure that the CallOpBuffers stay alive until completion.
pull/501/head
Craig Tiller 10 years ago
parent 80e00a8c63
commit 549a74daa8
  1. 2
      include/grpc++/call.h
  2. 78
      include/grpc++/stream.h

@ -57,6 +57,8 @@ class CallOpBuffer final : public CompletionQueueTag {
public: public:
CallOpBuffer() : return_tag_(this) {} CallOpBuffer() : return_tag_(this) {}
void Reset(void *next_return_tag);
void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata); void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata);
void AddSendMessage(const google::protobuf::Message &message); void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message); void AddRecvMessage(google::protobuf::Message *message);

@ -330,27 +330,30 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface,
ClientContext *context, ClientContext *context,
const google::protobuf::Message &request, void* tag) const google::protobuf::Message &request, void* tag)
: call_(channel->CreateCall(method, context, &cq_)) { : call_(channel->CreateCall(method, context, &cq_)) {
CallOpBuffer buf; init_buf_.Reset(tag);
buf.AddSendMessage(request); init_buf_.AddSendMessage(request);
buf.AddClientSendClose(); init_buf_.AddClientSendClose();
call_.PerformOps(&buf); call_.PerformOps(&init_buf_);
} }
virtual void Read(R *msg, void* tag) override { virtual void Read(R *msg, void* tag) override {
CallOpBuffer buf; read_buf_.Reset(tag);
buf.AddRecvMessage(msg); read_buf_.AddRecvMessage(msg);
call_.PerformOps(&buf); call_.PerformOps(&read_buf_);
} }
virtual void Finish(Status* status, void* tag) override { virtual void Finish(Status* status, void* tag) override {
CallOpBuffer buf; finish_buf_.Reset(tag);
buf.AddClientRecvStatus(status); finish_buf_.AddClientRecvStatus(status);
call_.PerformOps(&buf); call_.PerformOps(&finish_buf_);
} }
private: private:
CompletionQueue cq_; CompletionQueue cq_;
Call call_; Call call_;
CallOpBuffer init_buf_;
CallOpBuffer read_buf_;
CallOpBuffer finish_buf_;
}; };
template <class W> template <class W>
@ -365,28 +368,31 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
call_(channel->CreateCall(method, context, &cq_)) {} call_(channel->CreateCall(method, context, &cq_)) {}
virtual void Write(const W& msg, void* tag) override { virtual void Write(const W& msg, void* tag) override {
CallOpBuffer buf; write_buf_.Reset(tag);
buf.AddSendMessage(msg); write_buf_.AddSendMessage(msg);
call_.PerformOps(&buf); call_.PerformOps(&write_buf_);
} }
virtual void WritesDone(void* tag) { virtual void WritesDone(void* tag) override {
CallOpBuffer buf; writes_done_buf_.Reset(tag);
buf.AddClientSendClose(); writes_done_buf_.AddClientSendClose();
call_.PerformOps(&buf); call_.PerformOps(&writes_done_buf_);
} }
virtual void Finish(Status* status, void* tag) override { virtual void Finish(Status* status, void* tag) override {
CallOpBuffer buf; finish_buf_.Reset(tag);
buf.AddRecvMessage(response_); finish_buf_.AddRecvMessage(response_);
buf.AddClientRecvStatus(status); finish_buf_.AddClientRecvStatus(status);
call_.PerformOps(&buf); call_.PerformOps(&finish_buf_);
} }
private: private:
google::protobuf::Message *const response_; google::protobuf::Message *const response_;
CompletionQueue cq_; CompletionQueue cq_;
Call call_; Call call_;
CallOpBuffer write_buf_;
CallOpBuffer writes_done_buf_;
CallOpBuffer finish_buf_;
}; };
// Client-side interface for bi-directional streaming. // Client-side interface for bi-directional streaming.
@ -400,32 +406,36 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
: call_(channel->CreateCall(method, context, &cq_)) {} : call_(channel->CreateCall(method, context, &cq_)) {}
virtual void Read(R *msg, void* tag) override { virtual void Read(R *msg, void* tag) override {
CallOpBuffer buf; read_buf_.Reset(tag);
buf.AddRecvMessage(msg); read_buf_.AddRecvMessage(msg);
call_.PerformOps(&buf); call_.PerformOps(&read_buf_);
} }
virtual void Write(const W& msg, void* tag) override { virtual void Write(const W& msg, void* tag) override {
CallOpBuffer buf; write_buf_.Reset(tag);
buf.AddSendMessage(msg); write_buf_.AddSendMessage(msg);
call_.PerformOps(&buf); call_.PerformOps(&write_buf_);
} }
virtual void WritesDone(void* tag) { virtual void WritesDone(void* tag) override {
CallOpBuffer buf; writes_done_buf_.Reset(tag);
buf.AddClientSendClose(); writes_done_buf_.AddClientSendClose();
call_.PerformOps(&buf); call_.PerformOps(&writes_done_buf_);
} }
virtual void Finish(Status* status, void* tag) override { virtual void Finish(Status* status, void* tag) override {
CallOpBuffer buf; finish_buf_.Reset(tag);
buf.AddClientRecvStatus(status); finish_buf_.AddClientRecvStatus(status);
call_.PerformOps(&buf); call_.PerformOps(&finish_buf_);
} }
private: private:
CompletionQueue cq_; CompletionQueue cq_;
Call call_; Call call_;
CallOpBuffer read_buf_;
CallOpBuffer write_buf_;
CallOpBuffer writes_done_buf_;
CallOpBuffer finish_buf_;
}; };
// TODO(yangg) Move out of stream.h // TODO(yangg) Move out of stream.h

Loading…
Cancel
Save