Undo much of the previous commits so that only CallOpSet groups

with a Sneaky member are in a collection.
pull/5099/head
Vijay Pai 9 years ago
parent 5014341da6
commit 5506bea349
  1. 287
      include/grpc++/impl/codegen/async_stream.h
  2. 41
      include/grpc++/impl/codegen/async_unary_call.h
  3. 6
      include/grpc++/impl/codegen/call.h

@ -105,62 +105,49 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context, const RpcMethod& method, ClientContext* context,
const W& request, void* tag) const W& request, void* tag)
: context_(context), : context_(context), call_(channel->CreateCall(method, context, cq)) {
call_(channel->CreateCall(method, context, cq)), init_ops_.set_output_tag(tag);
collection_(new CallOpSetCollection) { init_ops_.SendInitialMetadata(context->send_initial_metadata_);
collection_->SetCollection();
collection_->init_ops_.set_output_tag(tag);
collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_ASSERT(collection_->init_ops_.SendMessage(request).ok()); GPR_ASSERT(init_ops_.SendMessage(request).ok());
collection_->init_ops_.ClientSendClose(); init_ops_.ClientSendClose();
call_.PerformOps(&collection_->init_ops_); call_.PerformOps(&init_ops_);
} }
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_); GPR_ASSERT(!context_->initial_metadata_received_);
collection_->meta_ops_.set_output_tag(tag); meta_ops_.set_output_tag(tag);
collection_->meta_ops_.RecvInitialMetadata(context_); meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&collection_->meta_ops_); call_.PerformOps(&meta_ops_);
} }
void Read(R* msg, void* tag) GRPC_OVERRIDE { void Read(R* msg, void* tag) GRPC_OVERRIDE {
collection_->read_ops_.set_output_tag(tag); read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
collection_->read_ops_.RecvInitialMetadata(context_); read_ops_.RecvInitialMetadata(context_);
} }
collection_->read_ops_.RecvMessage(msg); read_ops_.RecvMessage(msg);
call_.PerformOps(&collection_->read_ops_); call_.PerformOps(&read_ops_);
} }
void Finish(Status* status, void* tag) GRPC_OVERRIDE { void Finish(Status* status, void* tag) GRPC_OVERRIDE {
collection_->finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
collection_->finish_ops_.RecvInitialMetadata(context_); finish_ops_.RecvInitialMetadata(context_);
} }
collection_->finish_ops_.ClientRecvStatus(context_, status); finish_ops_.ClientRecvStatus(context_, status);
call_.PerformOps(&collection_->finish_ops_); call_.PerformOps(&finish_ops_);
} }
private: private:
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
class CallOpSetCollection : public CallOpSetCollectionInterface { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
public: init_ops_;
void SetCollection() {
init_ops_.SetCollection(shared_from_this());
meta_ops_.SetCollection(shared_from_this());
read_ops_.SetCollection(shared_from_this());
finish_ops_.SetCollection(shared_from_this());
}
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
std::shared_ptr<CallOpSetCollection> collection_;
}; };
/// Common interface for client side asynchronous writing. /// Common interface for client side asynchronous writing.
@ -181,67 +168,53 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context, const RpcMethod& method, ClientContext* context,
R* response, void* tag) R* response, void* tag)
: context_(context), : context_(context), call_(channel->CreateCall(method, context, cq)) {
call_(channel->CreateCall(method, context, cq)), finish_ops_.RecvMessage(response);
collection_(new CallOpSetCollection) {
collection_->SetCollection();
collection_->finish_ops_.RecvMessage(response);
collection_->init_ops_.set_output_tag(tag); init_ops_.set_output_tag(tag);
collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); init_ops_.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&collection_->init_ops_); call_.PerformOps(&init_ops_);
} }
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_); GPR_ASSERT(!context_->initial_metadata_received_);
collection_->meta_ops_.set_output_tag(tag); meta_ops_.set_output_tag(tag);
collection_->meta_ops_.RecvInitialMetadata(context_); meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&collection_->meta_ops_); call_.PerformOps(&meta_ops_);
} }
void Write(const W& msg, void* tag) GRPC_OVERRIDE { void Write(const W& msg, void* tag) GRPC_OVERRIDE {
collection_->write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); GPR_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&collection_->write_ops_); call_.PerformOps(&write_ops_);
} }
void WritesDone(void* tag) GRPC_OVERRIDE { void WritesDone(void* tag) GRPC_OVERRIDE {
collection_->writes_done_ops_.set_output_tag(tag); writes_done_ops_.set_output_tag(tag);
collection_->writes_done_ops_.ClientSendClose(); writes_done_ops_.ClientSendClose();
call_.PerformOps(&collection_->writes_done_ops_); call_.PerformOps(&writes_done_ops_);
} }
void Finish(Status* status, void* tag) GRPC_OVERRIDE { void Finish(Status* status, void* tag) GRPC_OVERRIDE {
collection_->finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
collection_->finish_ops_.RecvInitialMetadata(context_); finish_ops_.RecvInitialMetadata(context_);
} }
collection_->finish_ops_.ClientRecvStatus(context_, status); finish_ops_.ClientRecvStatus(context_, status);
call_.PerformOps(&collection_->finish_ops_); call_.PerformOps(&finish_ops_);
} }
private: private:
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
class CallOpSetCollection : public CallOpSetCollectionInterface {
public:
void SetCollection() {
init_ops_.SetCollection(shared_from_this());
meta_ops_.SetCollection(shared_from_this());
write_ops_.SetCollection(shared_from_this());
writes_done_ops_.SetCollection(shared_from_this());
finish_ops_.SetCollection(shared_from_this());
}
CallOpSet<CallOpSendInitialMetadata> init_ops_; CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpSendMessage> write_ops_; CallOpSet<CallOpSendMessage> write_ops_;
CallOpSet<CallOpClientSendClose> writes_done_ops_; CallOpSet<CallOpClientSendClose> writes_done_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
CallOpClientRecvStatus> finish_ops_; CallOpClientRecvStatus> finish_ops_;
};
std::shared_ptr<CallOpSetCollection> collection_;
}; };
/// Client-side interface for asynchronous bi-directional streaming. /// Client-side interface for asynchronous bi-directional streaming.
@ -263,75 +236,60 @@ class ClientAsyncReaderWriter GRPC_FINAL
ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context, const RpcMethod& method, ClientContext* context,
void* tag) void* tag)
: context_(context), : context_(context), call_(channel->CreateCall(method, context, cq)) {
call_(channel->CreateCall(method, context, cq)), init_ops_.set_output_tag(tag);
collection_(new CallOpSetCollection) { init_ops_.SendInitialMetadata(context->send_initial_metadata_);
collection_->SetCollection(); call_.PerformOps(&init_ops_);
collection_->init_ops_.set_output_tag(tag);
collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&collection_->init_ops_);
} }
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_); GPR_ASSERT(!context_->initial_metadata_received_);
collection_->meta_ops_.set_output_tag(tag); meta_ops_.set_output_tag(tag);
collection_->meta_ops_.RecvInitialMetadata(context_); meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&collection_->meta_ops_); call_.PerformOps(&meta_ops_);
} }
void Read(R* msg, void* tag) GRPC_OVERRIDE { void Read(R* msg, void* tag) GRPC_OVERRIDE {
collection_->read_ops_.set_output_tag(tag); read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
collection_->read_ops_.RecvInitialMetadata(context_); read_ops_.RecvInitialMetadata(context_);
} }
collection_->read_ops_.RecvMessage(msg); read_ops_.RecvMessage(msg);
call_.PerformOps(&collection_->read_ops_); call_.PerformOps(&read_ops_);
} }
void Write(const W& msg, void* tag) GRPC_OVERRIDE { void Write(const W& msg, void* tag) GRPC_OVERRIDE {
collection_->write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); GPR_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&collection_->write_ops_); call_.PerformOps(&write_ops_);
} }
void WritesDone(void* tag) GRPC_OVERRIDE { void WritesDone(void* tag) GRPC_OVERRIDE {
collection_->writes_done_ops_.set_output_tag(tag); writes_done_ops_.set_output_tag(tag);
collection_->writes_done_ops_.ClientSendClose(); writes_done_ops_.ClientSendClose();
call_.PerformOps(&collection_->writes_done_ops_); call_.PerformOps(&writes_done_ops_);
} }
void Finish(Status* status, void* tag) GRPC_OVERRIDE { void Finish(Status* status, void* tag) GRPC_OVERRIDE {
collection_->finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
collection_->finish_ops_.RecvInitialMetadata(context_); finish_ops_.RecvInitialMetadata(context_);
} }
collection_->finish_ops_.ClientRecvStatus(context_, status); finish_ops_.ClientRecvStatus(context_, status);
call_.PerformOps(&collection_->finish_ops_); call_.PerformOps(&finish_ops_);
} }
private: private:
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
class CallOpSetCollection : public CallOpSetCollectionInterface {
public:
void SetCollection() {
init_ops_.SetCollection(shared_from_this());
meta_ops_.SetCollection(shared_from_this());
read_ops_.SetCollection(shared_from_this());
write_ops_.SetCollection(shared_from_this());
writes_done_ops_.SetCollection(shared_from_this());
finish_ops_.SetCollection(shared_from_this());
}
CallOpSet<CallOpSendInitialMetadata> init_ops_; CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendMessage> write_ops_; CallOpSet<CallOpSendMessage> write_ops_;
CallOpSet<CallOpClientSendClose> writes_done_ops_; CallOpSet<CallOpClientSendClose> writes_done_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
std::shared_ptr<CallOpSetCollection> collection_;
}; };
template <class W, class R> template <class W, class R>
@ -339,53 +297,48 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> { public AsyncReaderInterface<R> {
public: public:
explicit ServerAsyncReader(ServerContext* ctx) explicit ServerAsyncReader(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
ctx_(ctx),
collection_(new CallOpSetCollection) {
collection_->SetCollection();
}
void SendInitialMetadata(void* tag) GRPC_OVERRIDE { void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_); GPR_ASSERT(!ctx_->sent_initial_metadata_);
collection_->meta_ops_.set_output_tag(tag); meta_ops_.set_output_tag(tag);
collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&collection_->meta_ops_); call_.PerformOps(&meta_ops_);
} }
void Read(R* msg, void* tag) GRPC_OVERRIDE { void Read(R* msg, void* tag) GRPC_OVERRIDE {
collection_->read_ops_.set_output_tag(tag); read_ops_.set_output_tag(tag);
collection_->read_ops_.RecvMessage(msg); read_ops_.RecvMessage(msg);
call_.PerformOps(&collection_->read_ops_); call_.PerformOps(&read_ops_);
} }
void Finish(const W& msg, const Status& status, void* tag) { void Finish(const W& msg, const Status& status, void* tag) {
collection_->finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { if (!ctx_->sent_initial_metadata_) {
collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
} }
// The response is dropped if the status is not OK. // The response is dropped if the status is not OK.
if (status.ok()) { if (status.ok()) {
collection_->finish_ops_.ServerSendStatus( finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
ctx_->trailing_metadata_, collection_->finish_ops_.SendMessage(msg)); finish_ops_.SendMessage(msg));
} else { } else {
collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
status);
} }
call_.PerformOps(&collection_->finish_ops_); call_.PerformOps(&finish_ops_);
} }
void FinishWithError(const Status& status, void* tag) { void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok()); GPR_ASSERT(!status.ok());
collection_->finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { if (!ctx_->sent_initial_metadata_) {
collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
} }
collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&collection_->finish_ops_); call_.PerformOps(&finish_ops_);
} }
private: private:
@ -393,19 +346,10 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_; Call call_;
ServerContext* ctx_; ServerContext* ctx_;
class CallOpSetCollection : public CallOpSetCollectionInterface {
public:
void SetCollection() {
meta_ops_.SetCollection(shared_from_this());
read_ops_.SetCollection(shared_from_this());
finish_ops_.SetCollection(shared_from_this());
}
CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_; CallOpSet<CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus> finish_ops_; CallOpServerSendStatus> finish_ops_;
};
std::shared_ptr<CallOpSetCollection> collection_;
}; };
template <class W> template <class W>
@ -413,40 +357,36 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> { public AsyncWriterInterface<W> {
public: public:
explicit ServerAsyncWriter(ServerContext* ctx) explicit ServerAsyncWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
ctx_(ctx),
collection_(new CallOpSetCollection) {
collection_->SetCollection();
}
void SendInitialMetadata(void* tag) GRPC_OVERRIDE { void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_); GPR_ASSERT(!ctx_->sent_initial_metadata_);
collection_->meta_ops_.set_output_tag(tag); meta_ops_.set_output_tag(tag);
collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&collection_->meta_ops_); call_.PerformOps(&meta_ops_);
} }
void Write(const W& msg, void* tag) GRPC_OVERRIDE { void Write(const W& msg, void* tag) GRPC_OVERRIDE {
collection_->write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { if (!ctx_->sent_initial_metadata_) {
collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
} }
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); GPR_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&collection_->write_ops_); call_.PerformOps(&write_ops_);
} }
void Finish(const Status& status, void* tag) { void Finish(const Status& status, void* tag) {
collection_->finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { if (!ctx_->sent_initial_metadata_) {
collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
} }
collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&collection_->finish_ops_); call_.PerformOps(&finish_ops_);
} }
private: private:
@ -454,18 +394,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_; Call call_;
ServerContext* ctx_; ServerContext* ctx_;
class CallOpSetCollection : public CallOpSetCollectionInterface {
public:
void SetCollection() {
meta_ops_.SetCollection(shared_from_this());
write_ops_.SetCollection(shared_from_this());
finish_ops_.SetCollection(shared_from_this());
}
CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
std::shared_ptr<CallOpSetCollection> collection_;
}; };
/// Server-side interface for asynchronous bi-directional streaming. /// Server-side interface for asynchronous bi-directional streaming.
@ -475,46 +406,42 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> { public AsyncReaderInterface<R> {
public: public:
explicit ServerAsyncReaderWriter(ServerContext* ctx) explicit ServerAsyncReaderWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
ctx_(ctx),
collection_(new CallOpSetCollection) {
collection_->SetCollection();
}
void SendInitialMetadata(void* tag) GRPC_OVERRIDE { void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_); GPR_ASSERT(!ctx_->sent_initial_metadata_);
collection_->meta_ops_.set_output_tag(tag); meta_ops_.set_output_tag(tag);
collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&collection_->meta_ops_); call_.PerformOps(&meta_ops_);
} }
void Read(R* msg, void* tag) GRPC_OVERRIDE { void Read(R* msg, void* tag) GRPC_OVERRIDE {
collection_->read_ops_.set_output_tag(tag); read_ops_.set_output_tag(tag);
collection_->read_ops_.RecvMessage(msg); read_ops_.RecvMessage(msg);
call_.PerformOps(&collection_->read_ops_); call_.PerformOps(&read_ops_);
} }
void Write(const W& msg, void* tag) GRPC_OVERRIDE { void Write(const W& msg, void* tag) GRPC_OVERRIDE {
collection_->write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { if (!ctx_->sent_initial_metadata_) {
collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
} }
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); GPR_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&collection_->write_ops_); call_.PerformOps(&write_ops_);
} }
void Finish(const Status& status, void* tag) { void Finish(const Status& status, void* tag) {
collection_->finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { if (!ctx_->sent_initial_metadata_) {
collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
} }
collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&collection_->finish_ops_); call_.PerformOps(&finish_ops_);
} }
private: private:
@ -524,20 +451,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_; Call call_;
ServerContext* ctx_; ServerContext* ctx_;
class CallOpSetCollection : public CallOpSetCollectionInterface {
public:
void SetCollection() {
meta_ops_.SetCollection(shared_from_this());
read_ops_.SetCollection(shared_from_this());
write_ops_.SetCollection(shared_from_this());
finish_ops_.SetCollection(shared_from_this());
}
CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_; CallOpSet<CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
std::shared_ptr<CallOpSetCollection> collection_;
}; };
} // namespace grpc } // namespace grpc

@ -116,47 +116,42 @@ class ServerAsyncResponseWriter GRPC_FINAL
: public ServerAsyncStreamingInterface { : public ServerAsyncStreamingInterface {
public: public:
explicit ServerAsyncResponseWriter(ServerContext* ctx) explicit ServerAsyncResponseWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
ctx_(ctx),
collection_(new CallOpSetCollection) {
collection_->SetCollection();
}
void SendInitialMetadata(void* tag) GRPC_OVERRIDE { void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_); GPR_ASSERT(!ctx_->sent_initial_metadata_);
collection_->meta_buf_.set_output_tag(tag); meta_buf_.set_output_tag(tag);
collection_->meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&collection_->meta_buf_); call_.PerformOps(&meta_buf_);
} }
void Finish(const W& msg, const Status& status, void* tag) { void Finish(const W& msg, const Status& status, void* tag) {
collection_->finish_buf_.set_output_tag(tag); finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { if (!ctx_->sent_initial_metadata_) {
collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
} }
// The response is dropped if the status is not OK. // The response is dropped if the status is not OK.
if (status.ok()) { if (status.ok()) {
collection_->finish_buf_.ServerSendStatus( finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
ctx_->trailing_metadata_, collection_->finish_buf_.SendMessage(msg)); finish_buf_.SendMessage(msg));
} else { } else {
collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
status);
} }
call_.PerformOps(&collection_->finish_buf_); call_.PerformOps(&finish_buf_);
} }
void FinishWithError(const Status& status, void* tag) { void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok()); GPR_ASSERT(!status.ok());
collection_->finish_buf_.set_output_tag(tag); finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) { if (!ctx_->sent_initial_metadata_) {
collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true; ctx_->sent_initial_metadata_ = true;
} }
collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&collection_->finish_buf_); call_.PerformOps(&finish_buf_);
} }
private: private:
@ -164,17 +159,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
Call call_; Call call_;
ServerContext* ctx_; ServerContext* ctx_;
class CallOpSetCollection : public CallOpSetCollectionInterface {
public:
void SetCollection() {
meta_buf_.SetCollection(shared_from_this());
finish_buf_.SetCollection(shared_from_this());
}
CallOpSet<CallOpSendInitialMetadata> meta_buf_; CallOpSet<CallOpSendInitialMetadata> meta_buf_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus> finish_buf_; CallOpServerSendStatus> finish_buf_;
};
std::shared_ptr<CallOpSetCollection> collection_;
}; };
} // namespace grpc } // namespace grpc

@ -477,7 +477,9 @@ class CallOpClientRecvStatus {
/// of the group should have a shared_ptr back to the collection, /// of the group should have a shared_ptr back to the collection,
/// as will the object that instantiates the collection, allowing /// as will the object that instantiates the collection, allowing
/// for automatic ref-counting. In practice, any actual use should /// for automatic ref-counting. In practice, any actual use should
/// derive from this base class /// derive from this base class. This is specifically necessary if
/// some of the CallOpSet's in the collection are "Sneaky" and don't
/// report back to the C++ layer CQ operations
class CallOpSetCollectionInterface class CallOpSetCollectionInterface
: public std::enable_shared_from_this<CallOpSetCollectionInterface> {}; : public std::enable_shared_from_this<CallOpSetCollectionInterface> {};
@ -497,7 +499,7 @@ class CallOpSetInterface : public CompletionQueueTag {
max_message_size_ = max_message_size; max_message_size_ = max_message_size;
} }
/// Mark this as belonging to a collection /// Mark this as belonging to a collection if needed
void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) { void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) {
collection_ = collection; collection_ = collection;
} }

Loading…
Cancel
Save