Merge branch 'c++api' of github.com:ctiller/grpc into c++api

pull/501/head
Craig Tiller 10 years ago
commit 7478d9184f
  1. 244
      include/grpc++/stream.h

@ -365,9 +365,20 @@ class ClientAsyncStreamingInterface {
public: public:
virtual ~ClientAsyncStreamingInterface() {} virtual ~ClientAsyncStreamingInterface() {}
virtual void ReadInitialMetadata(void* tag) = 0;
virtual void Finish(Status* status, void* tag) = 0; virtual void Finish(Status* status, void* tag) = 0;
}; };
class ServerAsyncStreamingInterface {
public:
virtual ~ServerAsyncStreamingInterface() {}
virtual void SendInitialMetadata(void* tag) = 0;
virtual void Finish(const Status& status, void* tag) = 0;
};
// An interface that yields a sequence of R messages. // An interface that yields a sequence of R messages.
template <class R> template <class R>
class AsyncReaderInterface { class AsyncReaderInterface {
@ -390,30 +401,50 @@ template <class R>
class ClientAsyncReader final : public ClientAsyncStreamingInterface, class ClientAsyncReader final : public ClientAsyncStreamingInterface,
public AsyncReaderInterface<R> { public AsyncReaderInterface<R> {
public: public:
// Blocking create a stream and write the first request out. // Create a stream and write the first request out.
ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method, ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context, ClientContext *context,
const google::protobuf::Message &request, void* tag) const google::protobuf::Message &request, void* tag)
: call_(channel->CreateCall(method, context, &cq_)) { : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
init_buf_.Reset(tag); init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request); init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose(); init_buf_.AddClientSendClose();
call_.PerformOps(&init_buf_); call_.PerformOps(&init_buf_);
} }
virtual void Read(R *msg, void* tag) override { void ReadInitialMetadata(void* tag) override {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
buf.Reset(tag);
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
call_.PerformOps(&buf);
context_->initial_metadata_received_ = true;
}
void Read(R *msg, void* tag) override {
read_buf_.Reset(tag); read_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
read_buf_.AddRecvMessage(msg); read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_); call_.PerformOps(&read_buf_);
} }
virtual void Finish(Status* status, void* tag) override { void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag); finish_buf_.Reset(tag);
finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_); call_.PerformOps(&finish_buf_);
} }
private: private:
ClientContext* context_ = nullptr;
CompletionQueue cq_; CompletionQueue cq_;
Call call_; Call call_;
CallOpBuffer init_buf_; CallOpBuffer init_buf_;
@ -425,37 +456,56 @@ template <class W>
class ClientAsyncWriter final : public ClientAsyncStreamingInterface, class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
public WriterInterface<W> { public WriterInterface<W> {
public: public:
// Blocking create a stream.
ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method, ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context, ClientContext *context,
google::protobuf::Message *response) google::protobuf::Message *response, void* tag)
: response_(response), : context_(context), response_(response),
call_(channel->CreateCall(method, context, &cq_)) {} call_(channel->CreateCall(method, context, &cq_)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&init_buf_);
}
virtual void Write(const W& msg, void* tag) override { void ReadInitialMetadata(void* tag) override {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
buf.Reset(tag);
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
call_.PerformOps(&buf);
context_->initial_metadata_received_ = true;
}
void Write(const W& msg, void* tag) override {
write_buf_.Reset(tag); write_buf_.Reset(tag);
write_buf_.AddSendMessage(msg); write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_); call_.PerformOps(&write_buf_);
} }
virtual void WritesDone(void* tag) override { void WritesDone(void* tag) override {
writes_done_buf_.Reset(tag); writes_done_buf_.Reset(tag);
writes_done_buf_.AddClientSendClose(); writes_done_buf_.AddClientSendClose();
call_.PerformOps(&writes_done_buf_); call_.PerformOps(&writes_done_buf_);
} }
virtual void Finish(Status* status, void* tag) override { void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag); finish_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
finish_buf_.AddRecvMessage(response_, &got_message_); finish_buf_.AddRecvMessage(response_, &got_message_);
finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_); call_.PerformOps(&finish_buf_);
} }
private: private:
ClientContext* context_ = nullptr;
google::protobuf::Message *const response_; google::protobuf::Message *const response_;
bool got_message_; bool got_message_;
CompletionQueue cq_; CompletionQueue cq_;
Call call_; Call call_;
CallOpBuffer init_buf_;
CallOpBuffer write_buf_; CallOpBuffer write_buf_;
CallOpBuffer writes_done_buf_; CallOpBuffer writes_done_buf_;
CallOpBuffer finish_buf_; CallOpBuffer finish_buf_;
@ -468,36 +518,60 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
public AsyncReaderInterface<R> { public AsyncReaderInterface<R> {
public: public:
ClientAsyncReaderWriter(ChannelInterface *channel, ClientAsyncReaderWriter(ChannelInterface *channel,
const RpcMethod &method, ClientContext *context) const RpcMethod &method, ClientContext *context, void* tag)
: call_(channel->CreateCall(method, context, &cq_)) {} : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&init_buf_);
}
void ReadInitialMetadata(void* tag) override {
GPR_ASSERT(!context_->initial_metadata_received_);
virtual void Read(R *msg, void* tag) override { CallOpBuffer buf;
buf.Reset(tag);
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
call_.PerformOps(&buf);
context_->initial_metadata_received_ = true;
}
void Read(R *msg, void* tag) override {
read_buf_.Reset(tag); read_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
read_buf_.AddRecvMessage(msg); read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_); call_.PerformOps(&read_buf_);
} }
virtual void Write(const W& msg, void* tag) override { void Write(const W& msg, void* tag) override {
write_buf_.Reset(tag); write_buf_.Reset(tag);
write_buf_.AddSendMessage(msg); write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_); call_.PerformOps(&write_buf_);
} }
virtual void WritesDone(void* tag) override { void WritesDone(void* tag) override {
writes_done_buf_.Reset(tag); writes_done_buf_.Reset(tag);
writes_done_buf_.AddClientSendClose(); writes_done_buf_.AddClientSendClose();
call_.PerformOps(&writes_done_buf_); call_.PerformOps(&writes_done_buf_);
} }
virtual void Finish(Status* status, void* tag) override { void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag); finish_buf_.Reset(tag);
finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_); call_.PerformOps(&finish_buf_);
} }
private: private:
ClientContext* context_ = nullptr;
CompletionQueue cq_; CompletionQueue cq_;
Call call_; Call call_;
CallOpBuffer init_buf_;
CallOpBuffer read_buf_; CallOpBuffer read_buf_;
CallOpBuffer write_buf_; CallOpBuffer write_buf_;
CallOpBuffer writes_done_buf_; CallOpBuffer writes_done_buf_;
@ -512,6 +586,7 @@ class ServerAsyncResponseWriter final {
virtual void Write(const W& msg, void* tag) override { virtual void Write(const W& msg, void* tag) override {
CallOpBuffer buf; CallOpBuffer buf;
buf.Reset(tag);
buf.AddSendMessage(msg); buf.AddSendMessage(msg);
call_->PerformOps(&buf); call_->PerformOps(&buf);
} }
@ -521,48 +596,147 @@ class ServerAsyncResponseWriter final {
}; };
template <class R> template <class R>
class ServerAsyncReader : public AsyncReaderInterface<R> { class ServerAsyncReader : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public: public:
explicit ServerAsyncReader(Call* call) : call_(call) {} ServerAsyncReader(Call* call, ServerContext* ctx)
: call_(call), ctx_(ctx) {}
void SendInitialMetadata(void* tag) override {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&meta_buf_);
}
virtual void Read(R* msg, void* tag) { void Read(R* msg, void* tag) override {
// TODO read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
call_->PerformOps(&read_buf_);
}
void Finish(const Status& status, void* tag) override {
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_->PerformOps(&finish_buf_);
} }
private: private:
Call* call_; Call* call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer finish_buf_;
}; };
template <class W> template <class W>
class ServerAsyncWriter : public AsyncWriterInterface<W> { class ServerAsyncWriter : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public: public:
explicit ServerAsyncWriter(Call* call) : call_(call) {} ServerAsyncWriter(Call* call, ServerContext* ctx)
: call_(call), ctx_(ctx) {}
void SendInitialMetadata(void* tag) override {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&meta_buf_);
}
void Write(const W& msg, void* tag) override {
write_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
write_buf_.AddSendMessage(msg);
call_->PerformOps(&write_buf_);
}
virtual void Write(const W& msg, void* tag) { void Finish(const Status& status, void* tag) override {
// TODO finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_->PerformOps(&finish_buf_);
} }
private: private:
Call* call_; Call* call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer write_buf_;
CallOpBuffer finish_buf_;
}; };
// Server-side interface for bi-directional streaming. // Server-side interface for bi-directional streaming.
template <class W, class R> template <class W, class R>
class ServerAsyncReaderWriter : public AsyncWriterInterface<W>, class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> { public AsyncWriterInterface<W>,
public AsyncReaderInterface<R> {
public: public:
explicit ServerAsyncReaderWriter(Call* call) : call_(call) {} ServerAsyncReaderWriter(Call* call, ServerContext* ctx)
: call_(call), ctx_(ctx) {}
void SendInitialMetadata(void* tag) override {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&meta_buf_);
}
virtual void Read(R* msg, void* tag) override {
read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
call_->PerformOps(&read_buf_);
}
virtual void Read(R* msg, void* tag) { virtual void Write(const W& msg, void* tag) override {
// TODO write_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
write_buf_.AddSendMessage(msg);
call_->PerformOps(&write_buf_);
} }
virtual void Write(const W& msg, void* tag) { void Finish(const Status& status, void* tag) override {
// TODO finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_->PerformOps(&finish_buf_);
} }
private: private:
Call* call_; Call* call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer write_buf_;
CallOpBuffer finish_buf_;
}; };
} // namespace grpc } // namespace grpc

Loading…
Cancel
Save