Implement async streaming APIs

pull/501/head
Yang Gao 10 years ago
parent 424bc92e37
commit a38feb9be7
  1. 2
      include/grpc++/server_context.h
  2. 139
      include/grpc++/stream.h

@ -72,7 +72,7 @@ class ServerContext {
template <class R> friend class ::grpc::ServerReader;
template <class W> friend class ::grpc::ServerWriter;
template <class R, class W> friend class ::grpc::ServerReaderWriter;
ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count);
const std::chrono::system_clock::time_point deadline_;

@ -370,6 +370,15 @@ class ClientAsyncStreamingInterface {
virtual void Finish(Status* status, void* tag) = 0;
};
class ServerAsyncStreamingInterface {
public:
virtual ~ServerAsyncStreamingInterface() {}
virtual void SendInitialMetadata(void* tag) = 0;
virtual void Finish(const Status& status, void* tag) = 0;
};
// An interface that yields a sequence of R messages.
template <class R>
class AsyncReaderInterface {
@ -577,6 +586,7 @@ class ServerAsyncResponseWriter final {
virtual void Write(const W& msg, void* tag) override {
CallOpBuffer buf;
buf.Reset(tag);
buf.AddSendMessage(msg);
call_->PerformOps(&buf);
}
@ -586,48 +596,147 @@ class ServerAsyncResponseWriter final {
};
template <class R>
class ServerAsyncReader : public AsyncReaderInterface<R> {
class ServerAsyncReader : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
explicit ServerAsyncReader(Call* call) : call_(call) {}
ServerAsyncReader(Call* call, ServerContext* ctx)
: call_(call), ctx_(ctx) {}
void SendInitialMetadata(void* tag) override {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
virtual void Read(R* msg, void* tag) {
// TODO
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&meta_buf_);
}
void Read(R* msg, void* tag) override {
read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
call_->PerformOps(&read_buf_);
}
void Finish(const Status& status, void* tag) override {
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_->PerformOps(&finish_buf_);
}
private:
Call* call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer finish_buf_;
};
template <class W>
class ServerAsyncWriter : public AsyncWriterInterface<W> {
class ServerAsyncWriter : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public:
explicit ServerAsyncWriter(Call* call) : call_(call) {}
ServerAsyncWriter(Call* call, ServerContext* ctx)
: call_(call), ctx_(ctx) {}
void SendInitialMetadata(void* tag) override {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&meta_buf_);
}
void Write(const W& msg, void* tag) override {
write_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
write_buf_.AddSendMessage(msg);
call_->PerformOps(&write_buf_);
}
virtual void Write(const W& msg, void* tag) {
// TODO
void Finish(const Status& status, void* tag) override {
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_->PerformOps(&finish_buf_);
}
private:
Call* call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer write_buf_;
CallOpBuffer finish_buf_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
class ServerAsyncReaderWriter : public AsyncWriterInterface<W>,
public AsyncReaderInterface<R> {
class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W>,
public AsyncReaderInterface<R> {
public:
explicit ServerAsyncReaderWriter(Call* call) : call_(call) {}
ServerAsyncReaderWriter(Call* call, ServerContext* ctx)
: call_(call), ctx_(ctx) {}
void SendInitialMetadata(void* tag) override {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&meta_buf_);
}
virtual void Read(R* msg, void* tag) override {
read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
call_->PerformOps(&read_buf_);
}
virtual void Read(R* msg, void* tag) {
// TODO
virtual void Write(const W& msg, void* tag) override {
write_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
write_buf_.AddSendMessage(msg);
call_->PerformOps(&write_buf_);
}
virtual void Write(const W& msg, void* tag) {
// TODO
void Finish(const Status& status, void* tag) override {
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_->PerformOps(&finish_buf_);
}
private:
Call* call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer write_buf_;
CallOpBuffer finish_buf_;
};
} // namespace grpc

Loading…
Cancel
Save