ServerReader with new API

pull/501/head
Yang Gao 10 years ago
parent cb4a825d7f
commit 8a3bbb5c55
  1. 29
      include/grpc++/stream.h

@ -96,7 +96,7 @@ class ClientReader final : public ClientStreamingInterface,
cq_.Pluck((void *)1); cq_.Pluck((void *)1);
} }
virtual bool Read(R *msg) { virtual bool Read(R *msg) override {
CallOpBuffer buf; CallOpBuffer buf;
buf.AddRecvMessage(msg); buf.AddRecvMessage(msg);
call_.PerformOps(&buf, (void *)2); call_.PerformOps(&buf, (void *)2);
@ -122,13 +122,13 @@ class ClientWriter final : public ClientStreamingInterface,
public WriterInterface<W> { public WriterInterface<W> {
public: public:
// Blocking create a stream. // Blocking create a stream.
explicit ClientWriter(ChannelInterface *channel, const RpcMethod &method, ClientWriter(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context, ClientContext *context,
google::protobuf::Message *response) google::protobuf::Message *response)
: response_(response), : response_(response),
call_(channel->CreateCall(method, context, &cq_)) {} call_(channel->CreateCall(method, context, &cq_)) {}
virtual bool Write(const W& msg) { virtual bool Write(const W& msg) override {
CallOpBuffer buf; CallOpBuffer buf;
buf.AddSendMessage(msg); buf.AddSendMessage(msg);
call_.PerformOps(&buf, (void *)2); call_.PerformOps(&buf, (void *)2);
@ -165,18 +165,18 @@ class ClientReaderWriter final : public ClientStreamingInterface,
public ReaderInterface<R> { public ReaderInterface<R> {
public: public:
// Blocking create a stream. // Blocking create a stream.
explicit ClientReaderWriter(ChannelInterface *channel, ClientReaderWriter(ChannelInterface *channel,
const RpcMethod &method, ClientContext *context) const RpcMethod &method, ClientContext *context)
: call_(channel->CreateCall(method, context, &cq_)) {} : call_(channel->CreateCall(method, context, &cq_)) {}
virtual bool Read(R *msg) { virtual bool Read(R *msg) override {
CallOpBuffer buf; CallOpBuffer buf;
buf.AddRecvMessage(msg); buf.AddRecvMessage(msg);
call_.PerformOps(&buf, (void *)2); call_.PerformOps(&buf, (void *)2);
return cq_.Pluck((void *)2); return cq_.Pluck((void *)2);
} }
virtual bool Write(const W& msg) { virtual bool Write(const W& msg) override {
CallOpBuffer buf; CallOpBuffer buf;
buf.AddSendMessage(msg); buf.AddSendMessage(msg);
call_.PerformOps(&buf, (void *)3); call_.PerformOps(&buf, (void *)3);
@ -205,17 +205,20 @@ class ClientReaderWriter final : public ClientStreamingInterface,
}; };
template <class R> template <class R>
class ServerReader : public ReaderInterface<R> { class ServerReader final : public ReaderInterface<R> {
public: public:
explicit ServerReader(StreamContextInterface* context) : context_(context) { ServerReader(CompletionQueue* cq, Call* call) : cq_(cq), call_(call) {}
GPR_ASSERT(context_);
context_->Start(true);
}
virtual bool Read(R* msg) { return context_->Read(msg); } virtual bool Read(R* msg) override {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
call_->PerformOps(&buf, (void *)2);
return cq_->Pluck((void *)2);
}
private: private:
StreamContextInterface* const context_; // not owned CompletionQueue* cq_;
Call* call_;
}; };
template <class W> template <class W>

Loading…
Cancel
Save