|
|
|
@ -93,15 +93,18 @@ template <class R> |
|
|
|
|
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { |
|
|
|
|
public: |
|
|
|
|
// Blocking create a stream and write the first request out.
|
|
|
|
|
template <class W> |
|
|
|
|
ClientReader(ChannelInterface* channel, const RpcMethod& method, |
|
|
|
|
ClientContext* context, const grpc::protobuf::Message& request) |
|
|
|
|
ClientContext* context, const W& request) |
|
|
|
|
: context_(context), call_(channel->CreateCall(method, context, &cq_)) { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddSendInitialMetadata(&context->send_initial_metadata_); |
|
|
|
|
buf.AddSendMessage(request); |
|
|
|
|
buf.AddClientSendClose(); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
cq_.Pluck(&buf); |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, |
|
|
|
|
CallOpClientSendClose> ops; |
|
|
|
|
ops.SendInitialMetadata(context->send_initial_metadata_); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_ASSERT(ops.SendMessage(request).ok()); |
|
|
|
|
ops.ClientSendClose(); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
cq_.Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Blocking wait for initial metadata from server. The received metadata
|
|
|
|
@ -111,28 +114,28 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { |
|
|
|
|
void WaitForInitialMetadata() { |
|
|
|
|
GPR_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
|
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddRecvInitialMetadata(context_); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
cq_.Pluck(&buf); // status ignored
|
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata> ops; |
|
|
|
|
ops.RecvInitialMetadata(context_); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
cq_.Pluck(&ops); // status ignored
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool Read(R* msg) GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
buf.AddRecvInitialMetadata(context_); |
|
|
|
|
ops.RecvInitialMetadata(context_); |
|
|
|
|
} |
|
|
|
|
buf.AddRecvMessage(msg); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
return cq_.Pluck(&buf) && buf.got_message; |
|
|
|
|
ops.RecvMessage(msg); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
return cq_.Pluck(&ops) && ops.got_message; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status Finish() GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
CallOpSet<CallOpClientRecvStatus> ops; |
|
|
|
|
Status status; |
|
|
|
|
buf.AddClientRecvStatus(context_, &status); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
GPR_ASSERT(cq_.Pluck(&buf)); |
|
|
|
|
ops.ClientRecvStatus(context_, &status); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
GPR_ASSERT(cq_.Pluck(&ops)); |
|
|
|
|
return status; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -150,48 +153,49 @@ class ClientWriterInterface : public ClientStreamingInterface, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
template <class W> |
|
|
|
|
class ClientWriter GRPC_FINAL : public ClientWriterInterface<W> { |
|
|
|
|
class ClientWriter : public ClientWriterInterface<W> { |
|
|
|
|
public: |
|
|
|
|
// Blocking create a stream.
|
|
|
|
|
template <class R> |
|
|
|
|
ClientWriter(ChannelInterface* channel, const RpcMethod& method, |
|
|
|
|
ClientContext* context, grpc::protobuf::Message* response) |
|
|
|
|
: context_(context), |
|
|
|
|
response_(response), |
|
|
|
|
call_(channel->CreateCall(method, context, &cq_)) { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddSendInitialMetadata(&context->send_initial_metadata_); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
cq_.Pluck(&buf); |
|
|
|
|
ClientContext* context, R* response) |
|
|
|
|
: context_(context), call_(channel->CreateCall(method, context, &cq_)) { |
|
|
|
|
finish_ops_.RecvMessage(response); |
|
|
|
|
|
|
|
|
|
CallOpSet<CallOpSendInitialMetadata> ops; |
|
|
|
|
ops.SendInitialMetadata(context->send_initial_metadata_); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
cq_.Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool Write(const W& msg) GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddSendMessage(msg); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
return cq_.Pluck(&buf); |
|
|
|
|
CallOpSet<CallOpSendMessage> ops; |
|
|
|
|
if (!ops.SendMessage(msg).ok()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
return cq_.Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool WritesDone() GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddClientSendClose(); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
return cq_.Pluck(&buf); |
|
|
|
|
CallOpSet<CallOpClientSendClose> ops; |
|
|
|
|
ops.ClientSendClose(); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
return cq_.Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Read the final response and wait for the final status.
|
|
|
|
|
Status Finish() GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
Status status; |
|
|
|
|
buf.AddRecvMessage(response_); |
|
|
|
|
buf.AddClientRecvStatus(context_, &status); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
GPR_ASSERT(cq_.Pluck(&buf)); |
|
|
|
|
finish_ops_.ClientRecvStatus(context_, &status); |
|
|
|
|
call_.PerformOps(&finish_ops_); |
|
|
|
|
GPR_ASSERT(cq_.Pluck(&finish_ops_)); |
|
|
|
|
return status; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
ClientContext* context_; |
|
|
|
|
grpc::protobuf::Message* const response_; |
|
|
|
|
CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; |
|
|
|
|
CompletionQueue cq_; |
|
|
|
|
Call call_; |
|
|
|
|
}; |
|
|
|
@ -213,10 +217,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { |
|
|
|
|
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, |
|
|
|
|
ClientContext* context) |
|
|
|
|
: context_(context), call_(channel->CreateCall(method, context, &cq_)) { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddSendInitialMetadata(&context->send_initial_metadata_); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
cq_.Pluck(&buf); |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata> ops; |
|
|
|
|
ops.SendInitialMetadata(context->send_initial_metadata_); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
cq_.Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Blocking wait for initial metadata from server. The received metadata
|
|
|
|
@ -226,42 +230,42 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { |
|
|
|
|
void WaitForInitialMetadata() { |
|
|
|
|
GPR_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
|
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddRecvInitialMetadata(context_); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
cq_.Pluck(&buf); // status ignored
|
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata> ops; |
|
|
|
|
ops.RecvInitialMetadata(context_); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
cq_.Pluck(&ops); // status ignored
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool Read(R* msg) GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
buf.AddRecvInitialMetadata(context_); |
|
|
|
|
ops.RecvInitialMetadata(context_); |
|
|
|
|
} |
|
|
|
|
buf.AddRecvMessage(msg); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
return cq_.Pluck(&buf) && buf.got_message; |
|
|
|
|
ops.RecvMessage(msg); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
return cq_.Pluck(&ops) && ops.got_message; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool Write(const W& msg) GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddSendMessage(msg); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
return cq_.Pluck(&buf); |
|
|
|
|
CallOpSet<CallOpSendMessage> ops; |
|
|
|
|
if (!ops.SendMessage(msg).ok()) return false; |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
return cq_.Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool WritesDone() GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddClientSendClose(); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
return cq_.Pluck(&buf); |
|
|
|
|
CallOpSet<CallOpClientSendClose> ops; |
|
|
|
|
ops.ClientSendClose(); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
return cq_.Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status Finish() GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
CallOpSet<CallOpClientRecvStatus> ops; |
|
|
|
|
Status status; |
|
|
|
|
buf.AddClientRecvStatus(context_, &status); |
|
|
|
|
call_.PerformOps(&buf); |
|
|
|
|
GPR_ASSERT(cq_.Pluck(&buf)); |
|
|
|
|
ops.ClientRecvStatus(context_, &status); |
|
|
|
|
call_.PerformOps(&ops); |
|
|
|
|
GPR_ASSERT(cq_.Pluck(&ops)); |
|
|
|
|
return status; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -279,18 +283,18 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> { |
|
|
|
|
void SendInitialMetadata() { |
|
|
|
|
GPR_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
|
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata> ops; |
|
|
|
|
ops.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
call_->PerformOps(&buf); |
|
|
|
|
call_->cq()->Pluck(&buf); |
|
|
|
|
call_->PerformOps(&ops); |
|
|
|
|
call_->cq()->Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool Read(R* msg) GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddRecvMessage(msg); |
|
|
|
|
call_->PerformOps(&buf); |
|
|
|
|
return call_->cq()->Pluck(&buf) && buf.got_message; |
|
|
|
|
CallOpSet<CallOpRecvMessage<R>> ops; |
|
|
|
|
ops.RecvMessage(msg); |
|
|
|
|
call_->PerformOps(&ops); |
|
|
|
|
return call_->cq()->Pluck(&ops) && ops.got_message; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -306,22 +310,24 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> { |
|
|
|
|
void SendInitialMetadata() { |
|
|
|
|
GPR_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
|
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata> ops; |
|
|
|
|
ops.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
call_->PerformOps(&buf); |
|
|
|
|
call_->cq()->Pluck(&buf); |
|
|
|
|
call_->PerformOps(&ops); |
|
|
|
|
call_->cq()->Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool Write(const W& msg) GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; |
|
|
|
|
if (!ops.SendMessage(msg).ok()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
buf.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
ops.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
} |
|
|
|
|
buf.AddSendMessage(msg); |
|
|
|
|
call_->PerformOps(&buf); |
|
|
|
|
return call_->cq()->Pluck(&buf); |
|
|
|
|
call_->PerformOps(&ops); |
|
|
|
|
return call_->cq()->Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -339,29 +345,31 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, |
|
|
|
|
void SendInitialMetadata() { |
|
|
|
|
GPR_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
|
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata> ops; |
|
|
|
|
ops.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
call_->PerformOps(&buf); |
|
|
|
|
call_->cq()->Pluck(&buf); |
|
|
|
|
call_->PerformOps(&ops); |
|
|
|
|
call_->cq()->Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool Read(R* msg) GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
buf.AddRecvMessage(msg); |
|
|
|
|
call_->PerformOps(&buf); |
|
|
|
|
return call_->cq()->Pluck(&buf) && buf.got_message; |
|
|
|
|
CallOpSet<CallOpRecvMessage<R>> ops; |
|
|
|
|
ops.RecvMessage(msg); |
|
|
|
|
call_->PerformOps(&ops); |
|
|
|
|
return call_->cq()->Pluck(&ops) && ops.got_message; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool Write(const W& msg) GRPC_OVERRIDE { |
|
|
|
|
CallOpBuffer buf; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; |
|
|
|
|
if (!ops.SendMessage(msg).ok()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
buf.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
ops.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
} |
|
|
|
|
buf.AddSendMessage(msg); |
|
|
|
|
call_->PerformOps(&buf); |
|
|
|
|
return call_->cq()->Pluck(&buf); |
|
|
|
|
call_->PerformOps(&ops); |
|
|
|
|
return call_->cq()->Pluck(&ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -400,57 +408,59 @@ class AsyncWriterInterface { |
|
|
|
|
|
|
|
|
|
template <class R> |
|
|
|
|
class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, |
|
|
|
|
public AsyncReaderInterface<R> { |
|
|
|
|
}; |
|
|
|
|
public AsyncReaderInterface<R> {}; |
|
|
|
|
|
|
|
|
|
template <class R> |
|
|
|
|
class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { |
|
|
|
|
public: |
|
|
|
|
// Create a stream and write the first request out.
|
|
|
|
|
template <class W> |
|
|
|
|
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, |
|
|
|
|
const RpcMethod& method, ClientContext* context, |
|
|
|
|
const grpc::protobuf::Message& request, void* tag) |
|
|
|
|
const W& request, void* tag) |
|
|
|
|
: context_(context), call_(channel->CreateCall(method, context, cq)) { |
|
|
|
|
init_buf_.Reset(tag); |
|
|
|
|
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); |
|
|
|
|
init_buf_.AddSendMessage(request); |
|
|
|
|
init_buf_.AddClientSendClose(); |
|
|
|
|
call_.PerformOps(&init_buf_); |
|
|
|
|
init_ops_.set_output_tag(tag); |
|
|
|
|
init_ops_.SendInitialMetadata(context->send_initial_metadata_); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_ASSERT(init_ops_.SendMessage(request).ok()); |
|
|
|
|
init_ops_.ClientSendClose(); |
|
|
|
|
call_.PerformOps(&init_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { |
|
|
|
|
GPR_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
|
|
|
|
|
meta_buf_.Reset(tag); |
|
|
|
|
meta_buf_.AddRecvInitialMetadata(context_); |
|
|
|
|
call_.PerformOps(&meta_buf_); |
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.RecvInitialMetadata(context_); |
|
|
|
|
call_.PerformOps(&meta_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Read(R* msg, void* tag) GRPC_OVERRIDE { |
|
|
|
|
read_buf_.Reset(tag); |
|
|
|
|
read_ops_.set_output_tag(tag); |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
read_buf_.AddRecvInitialMetadata(context_); |
|
|
|
|
read_ops_.RecvInitialMetadata(context_); |
|
|
|
|
} |
|
|
|
|
read_buf_.AddRecvMessage(msg); |
|
|
|
|
call_.PerformOps(&read_buf_); |
|
|
|
|
read_ops_.RecvMessage(msg); |
|
|
|
|
call_.PerformOps(&read_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Finish(Status* status, void* tag) GRPC_OVERRIDE { |
|
|
|
|
finish_buf_.Reset(tag); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
finish_buf_.AddRecvInitialMetadata(context_); |
|
|
|
|
finish_ops_.RecvInitialMetadata(context_); |
|
|
|
|
} |
|
|
|
|
finish_buf_.AddClientRecvStatus(context_, status); |
|
|
|
|
call_.PerformOps(&finish_buf_); |
|
|
|
|
finish_ops_.ClientRecvStatus(context_, status); |
|
|
|
|
call_.PerformOps(&finish_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
ClientContext* context_; |
|
|
|
|
Call call_; |
|
|
|
|
CallOpBuffer init_buf_; |
|
|
|
|
CallOpBuffer meta_buf_; |
|
|
|
|
CallOpBuffer read_buf_; |
|
|
|
|
CallOpBuffer finish_buf_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> |
|
|
|
|
init_ops_; |
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; |
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; |
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
template <class W> |
|
|
|
@ -463,56 +473,57 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, |
|
|
|
|
template <class W> |
|
|
|
|
class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { |
|
|
|
|
public: |
|
|
|
|
template <class R> |
|
|
|
|
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, |
|
|
|
|
const RpcMethod& method, ClientContext* context, |
|
|
|
|
grpc::protobuf::Message* response, void* tag) |
|
|
|
|
: context_(context), |
|
|
|
|
response_(response), |
|
|
|
|
call_(channel->CreateCall(method, context, cq)) { |
|
|
|
|
init_buf_.Reset(tag); |
|
|
|
|
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); |
|
|
|
|
call_.PerformOps(&init_buf_); |
|
|
|
|
R* response, void* tag) |
|
|
|
|
: context_(context), call_(channel->CreateCall(method, context, cq)) { |
|
|
|
|
finish_ops_.RecvMessage(response); |
|
|
|
|
|
|
|
|
|
init_ops_.set_output_tag(tag); |
|
|
|
|
init_ops_.SendInitialMetadata(context->send_initial_metadata_); |
|
|
|
|
call_.PerformOps(&init_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { |
|
|
|
|
GPR_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
|
|
|
|
|
meta_buf_.Reset(tag); |
|
|
|
|
meta_buf_.AddRecvInitialMetadata(context_); |
|
|
|
|
call_.PerformOps(&meta_buf_); |
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.RecvInitialMetadata(context_); |
|
|
|
|
call_.PerformOps(&meta_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Write(const W& msg, void* tag) GRPC_OVERRIDE { |
|
|
|
|
write_buf_.Reset(tag); |
|
|
|
|
write_buf_.AddSendMessage(msg); |
|
|
|
|
call_.PerformOps(&write_buf_); |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void WritesDone(void* tag) GRPC_OVERRIDE { |
|
|
|
|
writes_done_buf_.Reset(tag); |
|
|
|
|
writes_done_buf_.AddClientSendClose(); |
|
|
|
|
call_.PerformOps(&writes_done_buf_); |
|
|
|
|
writes_done_ops_.set_output_tag(tag); |
|
|
|
|
writes_done_ops_.ClientSendClose(); |
|
|
|
|
call_.PerformOps(&writes_done_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Finish(Status* status, void* tag) GRPC_OVERRIDE { |
|
|
|
|
finish_buf_.Reset(tag); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
finish_buf_.AddRecvInitialMetadata(context_); |
|
|
|
|
finish_ops_.RecvInitialMetadata(context_); |
|
|
|
|
} |
|
|
|
|
finish_buf_.AddRecvMessage(response_); |
|
|
|
|
finish_buf_.AddClientRecvStatus(context_, status); |
|
|
|
|
call_.PerformOps(&finish_buf_); |
|
|
|
|
finish_ops_.ClientRecvStatus(context_, status); |
|
|
|
|
call_.PerformOps(&finish_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
ClientContext* context_; |
|
|
|
|
grpc::protobuf::Message* const response_; |
|
|
|
|
Call call_; |
|
|
|
|
CallOpBuffer init_buf_; |
|
|
|
|
CallOpBuffer meta_buf_; |
|
|
|
|
CallOpBuffer write_buf_; |
|
|
|
|
CallOpBuffer writes_done_buf_; |
|
|
|
|
CallOpBuffer finish_buf_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata> init_ops_; |
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; |
|
|
|
|
CallOpSet<CallOpSendMessage> write_ops_; |
|
|
|
|
CallOpSet<CallOpClientSendClose> writes_done_ops_; |
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, |
|
|
|
|
CallOpClientRecvStatus> finish_ops_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Client-side interface for bi-directional streaming.
|
|
|
|
@ -532,58 +543,59 @@ class ClientAsyncReaderWriter GRPC_FINAL |
|
|
|
|
const RpcMethod& method, ClientContext* context, |
|
|
|
|
void* tag) |
|
|
|
|
: context_(context), call_(channel->CreateCall(method, context, cq)) { |
|
|
|
|
init_buf_.Reset(tag); |
|
|
|
|
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); |
|
|
|
|
call_.PerformOps(&init_buf_); |
|
|
|
|
init_ops_.set_output_tag(tag); |
|
|
|
|
init_ops_.SendInitialMetadata(context->send_initial_metadata_); |
|
|
|
|
call_.PerformOps(&init_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { |
|
|
|
|
GPR_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
|
|
|
|
|
meta_buf_.Reset(tag); |
|
|
|
|
meta_buf_.AddRecvInitialMetadata(context_); |
|
|
|
|
call_.PerformOps(&meta_buf_); |
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.RecvInitialMetadata(context_); |
|
|
|
|
call_.PerformOps(&meta_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Read(R* msg, void* tag) GRPC_OVERRIDE { |
|
|
|
|
read_buf_.Reset(tag); |
|
|
|
|
read_ops_.set_output_tag(tag); |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
read_buf_.AddRecvInitialMetadata(context_); |
|
|
|
|
read_ops_.RecvInitialMetadata(context_); |
|
|
|
|
} |
|
|
|
|
read_buf_.AddRecvMessage(msg); |
|
|
|
|
call_.PerformOps(&read_buf_); |
|
|
|
|
read_ops_.RecvMessage(msg); |
|
|
|
|
call_.PerformOps(&read_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Write(const W& msg, void* tag) GRPC_OVERRIDE { |
|
|
|
|
write_buf_.Reset(tag); |
|
|
|
|
write_buf_.AddSendMessage(msg); |
|
|
|
|
call_.PerformOps(&write_buf_); |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void WritesDone(void* tag) GRPC_OVERRIDE { |
|
|
|
|
writes_done_buf_.Reset(tag); |
|
|
|
|
writes_done_buf_.AddClientSendClose(); |
|
|
|
|
call_.PerformOps(&writes_done_buf_); |
|
|
|
|
writes_done_ops_.set_output_tag(tag); |
|
|
|
|
writes_done_ops_.ClientSendClose(); |
|
|
|
|
call_.PerformOps(&writes_done_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Finish(Status* status, void* tag) GRPC_OVERRIDE { |
|
|
|
|
finish_buf_.Reset(tag); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
finish_buf_.AddRecvInitialMetadata(context_); |
|
|
|
|
finish_ops_.RecvInitialMetadata(context_); |
|
|
|
|
} |
|
|
|
|
finish_buf_.AddClientRecvStatus(context_, status); |
|
|
|
|
call_.PerformOps(&finish_buf_); |
|
|
|
|
finish_ops_.ClientRecvStatus(context_, status); |
|
|
|
|
call_.PerformOps(&finish_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
ClientContext* context_; |
|
|
|
|
Call call_; |
|
|
|
|
CallOpBuffer init_buf_; |
|
|
|
|
CallOpBuffer meta_buf_; |
|
|
|
|
CallOpBuffer read_buf_; |
|
|
|
|
CallOpBuffer write_buf_; |
|
|
|
|
CallOpBuffer writes_done_buf_; |
|
|
|
|
CallOpBuffer finish_buf_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata> init_ops_; |
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; |
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; |
|
|
|
|
CallOpSet<CallOpSendMessage> write_ops_; |
|
|
|
|
CallOpSet<CallOpClientSendClose> writes_done_ops_; |
|
|
|
|
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
template <class W, class R> |
|
|
|
@ -596,41 +608,44 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, |
|
|
|
|
void SendInitialMetadata(void* tag) GRPC_OVERRIDE { |
|
|
|
|
GPR_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
|
|
|
|
|
meta_buf_.Reset(tag); |
|
|
|
|
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
call_.PerformOps(&meta_buf_); |
|
|
|
|
call_.PerformOps(&meta_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Read(R* msg, void* tag) GRPC_OVERRIDE { |
|
|
|
|
read_buf_.Reset(tag); |
|
|
|
|
read_buf_.AddRecvMessage(msg); |
|
|
|
|
call_.PerformOps(&read_buf_); |
|
|
|
|
read_ops_.set_output_tag(tag); |
|
|
|
|
read_ops_.RecvMessage(msg); |
|
|
|
|
call_.PerformOps(&read_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Finish(const W& msg, const Status& status, void* tag) { |
|
|
|
|
finish_buf_.Reset(tag); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
} |
|
|
|
|
// The response is dropped if the status is not OK.
|
|
|
|
|
if (status.ok()) { |
|
|
|
|
finish_buf_.AddSendMessage(msg); |
|
|
|
|
finish_ops_.ServerSendStatus( |
|
|
|
|
ctx_->trailing_metadata_, |
|
|
|
|
finish_ops_.SendMessage(msg)); |
|
|
|
|
} else { |
|
|
|
|
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); |
|
|
|
|
} |
|
|
|
|
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); |
|
|
|
|
call_.PerformOps(&finish_buf_); |
|
|
|
|
call_.PerformOps(&finish_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void FinishWithError(const Status& status, void* tag) { |
|
|
|
|
GPR_ASSERT(!status.ok()); |
|
|
|
|
finish_buf_.Reset(tag); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
} |
|
|
|
|
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); |
|
|
|
|
call_.PerformOps(&finish_buf_); |
|
|
|
|
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); |
|
|
|
|
call_.PerformOps(&finish_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -638,9 +653,10 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, |
|
|
|
|
|
|
|
|
|
Call call_; |
|
|
|
|
ServerContext* ctx_; |
|
|
|
|
CallOpBuffer meta_buf_; |
|
|
|
|
CallOpBuffer read_buf_; |
|
|
|
|
CallOpBuffer finish_buf_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata> meta_ops_; |
|
|
|
|
CallOpSet<CallOpRecvMessage<R>> read_ops_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, |
|
|
|
|
CallOpServerSendStatus> finish_ops_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
template <class W> |
|
|
|
@ -653,30 +669,31 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, |
|
|
|
|
void SendInitialMetadata(void* tag) GRPC_OVERRIDE { |
|
|
|
|
GPR_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
|
|
|
|
|
meta_buf_.Reset(tag); |
|
|
|
|
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
call_.PerformOps(&meta_buf_); |
|
|
|
|
call_.PerformOps(&meta_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Write(const W& msg, void* tag) GRPC_OVERRIDE { |
|
|
|
|
write_buf_.Reset(tag); |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
write_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
} |
|
|
|
|
write_buf_.AddSendMessage(msg); |
|
|
|
|
call_.PerformOps(&write_buf_); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Finish(const Status& status, void* tag) { |
|
|
|
|
finish_buf_.Reset(tag); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
} |
|
|
|
|
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); |
|
|
|
|
call_.PerformOps(&finish_buf_); |
|
|
|
|
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); |
|
|
|
|
call_.PerformOps(&finish_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -684,9 +701,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, |
|
|
|
|
|
|
|
|
|
Call call_; |
|
|
|
|
ServerContext* ctx_; |
|
|
|
|
CallOpBuffer meta_buf_; |
|
|
|
|
CallOpBuffer write_buf_; |
|
|
|
|
CallOpBuffer finish_buf_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata> meta_ops_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Server-side interface for bi-directional streaming.
|
|
|
|
@ -701,36 +718,37 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, |
|
|
|
|
void SendInitialMetadata(void* tag) GRPC_OVERRIDE { |
|
|
|
|
GPR_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
|
|
|
|
|
meta_buf_.Reset(tag); |
|
|
|
|
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
call_.PerformOps(&meta_buf_); |
|
|
|
|
call_.PerformOps(&meta_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Read(R* msg, void* tag) GRPC_OVERRIDE { |
|
|
|
|
read_buf_.Reset(tag); |
|
|
|
|
read_buf_.AddRecvMessage(msg); |
|
|
|
|
call_.PerformOps(&read_buf_); |
|
|
|
|
read_ops_.set_output_tag(tag); |
|
|
|
|
read_ops_.RecvMessage(msg); |
|
|
|
|
call_.PerformOps(&read_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Write(const W& msg, void* tag) GRPC_OVERRIDE { |
|
|
|
|
write_buf_.Reset(tag); |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
write_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
} |
|
|
|
|
write_buf_.AddSendMessage(msg); |
|
|
|
|
call_.PerformOps(&write_buf_); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Finish(const Status& status, void* tag) { |
|
|
|
|
finish_buf_.Reset(tag); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); |
|
|
|
|
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
|
|
|
|
ctx_->sent_initial_metadata_ = true; |
|
|
|
|
} |
|
|
|
|
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); |
|
|
|
|
call_.PerformOps(&finish_buf_); |
|
|
|
|
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); |
|
|
|
|
call_.PerformOps(&finish_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -738,10 +756,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, |
|
|
|
|
|
|
|
|
|
Call call_; |
|
|
|
|
ServerContext* ctx_; |
|
|
|
|
CallOpBuffer meta_buf_; |
|
|
|
|
CallOpBuffer read_buf_; |
|
|
|
|
CallOpBuffer write_buf_; |
|
|
|
|
CallOpBuffer finish_buf_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata> meta_ops_; |
|
|
|
|
CallOpSet<CallOpRecvMessage<R>> read_ops_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; |
|
|
|
|
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
} // namespace grpc
|
|
|
|
|