diff --git a/include/grpc++/call.h b/include/grpc++/call.h index 704cd47daec..8cddf78a6be 100644 --- a/include/grpc++/call.h +++ b/include/grpc++/call.h @@ -35,7 +35,9 @@ #define __GRPCPP_CALL_H__ #include -#include +#include + +#include namespace google { namespace protobuf { @@ -44,35 +46,47 @@ class Message; } // namespace google struct grpc_call; +struct grpc_op; namespace grpc { class ChannelInterface; -class CallOpBuffer final { +class CallOpBuffer final : public CompletionQueueTag { public: void AddSendMessage(const google::protobuf::Message &message); void AddRecvMessage(google::protobuf::Message *message); void AddClientSendClose(); void AddClientRecvStatus(Status *status); - void FinalizeResult(); + // INTERNAL API: - private: - static const size_t MAX_OPS = 6; - grpc_op ops_[MAX_OPS]; - int num_ops_ = 0; + // Convert to an array of grpc_op elements + void FillOps(grpc_op *ops, size_t *nops); + + // Called by completion queue just prior to returning from Next() or Pluck() + void FinalizeResult() override; +}; + +class CCallDeleter { + public: + void operator()(grpc_call *c); }; // Straightforward wrapping of the C call object class Call final { public: - Call(grpc_call *call, ChannelInterface *channel); + Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq); + + void PerformOps(CallOpBuffer *buffer, void *tag); - void PerformOps(const CallOpBuffer &buffer, void *tag); + grpc_call *call() { return call_.get(); } + CompletionQueue *cq() { return cq_; } private: - ChannelInterface *const channel_; + ChannelInterface *channel_; + CompletionQueue *cq_; + std::unique_ptr call_; }; } // namespace grpc diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h index 4f1e1911e6a..452c7857339 100644 --- a/include/grpc++/channel_interface.h +++ b/include/grpc++/channel_interface.h @@ -45,7 +45,8 @@ class Message; struct grpc_call; namespace grpc { - +class Call; +class CallOpBuffer; class ClientContext; class CompletionQueue; class RpcMethod; @@ -56,8 +57,9 @@ class ChannelInterface { public: virtual ~ChannelInterface() {} - virtual grpc_call *CreateCall(const RpcMethod &method, ClientContext *context, - CompletionQueue *cq); + virtual Call CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) = 0; + virtual void PerformOpsOnCall(CallOpBuffer *ops, void *tag, Call *call) = 0; }; // Wrapper that begins an asynchronous unary call diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index a60d27f4382..4bc707e5536 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -34,12 +34,21 @@ #ifndef __GRPCPP_COMPLETION_QUEUE_H__ #define __GRPCPP_COMPLETION_QUEUE_H__ -#include - struct grpc_completion_queue; namespace grpc { +class CompletionQueue; + +class CompletionQueueTag { + public: + virtual void FinalizeResult() = 0; + + private: + friend class CompletionQueue; + void *user_tag_; +}; + // grpc_completion_queue wrapper class class CompletionQueue { public: @@ -62,12 +71,9 @@ class CompletionQueue { // is returned. // MUST be used for all events that could be surfaced through this // wrapping API - template - void *PrepareTagForC(void *user_tag, F on_ready) { - return new std::function([user_tag, on_ready]() { - on_ready(); - return user_tag; - }); + void *PrepareTagForC(CompletionQueueTag *cq_tag, void *user_tag) { + cq_tag->user_tag_ = user_tag; + return cq_tag; } // Shutdown has to be called, and the CompletionQueue can only be @@ -77,8 +83,6 @@ class CompletionQueue { grpc_completion_queue* cq() { return cq_; } private: - typedef std::function FinishFunc; - grpc_completion_queue* cq_; // owned }; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 5c538431f54..dce07b69592 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -88,18 +88,18 @@ class ClientReader final : public ClientStreamingInterface, explicit ClientReader(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const google::protobuf::Message &request) - : call_(channel->CreateCall(method, context, &cq_), channel) { + : call_(channel->CreateCall(method, context, &cq_)) { CallOpBuffer buf; buf.AddSendMessage(request); buf.AddClientSendClose(); - call_.PerformOps(buf, (void *)1); + call_.PerformOps(&buf, (void *)1); cq_.Pluck((void *)1); } virtual bool Read(R *msg) { CallOpBuffer buf; buf.AddRecvMessage(msg); - call_.PerformOps(buf, (void *)2); + call_.PerformOps(&buf, (void *)2); return cq_.Pluck((void *)2); } @@ -107,7 +107,7 @@ class ClientReader final : public ClientStreamingInterface, CallOpBuffer buf; Status status; buf.AddClientRecvStatus(&status); - call_.PerformOps(buf, (void *)3); + call_.PerformOps(&buf, (void *)3); GPR_ASSERT(cq_.Pluck((void *)3)); return status; } @@ -126,19 +126,19 @@ class ClientWriter final : public ClientStreamingInterface, ClientContext *context, google::protobuf::Message *response) : response_(response), - call_(channel->CreateCall(method, context, &cq_), channel) {} + call_(channel->CreateCall(method, context, &cq_)) {} virtual bool Write(const W& msg) { CallOpBuffer buf; buf.AddSendMessage(msg); - call_.PerformOps(buf, (void *)2); + call_.PerformOps(&buf, (void *)2); return cq_.Pluck((void *)2); } virtual bool WritesDone() { CallOpBuffer buf; buf.AddClientSendClose(); - call_.PerformOps(buf, (void *)3); + call_.PerformOps(&buf, (void *)3); return cq_.Pluck((void *)3); } @@ -147,7 +147,7 @@ class ClientWriter final : public ClientStreamingInterface, CallOpBuffer buf; Status status; buf.AddClientRecvStatus(&status); - call_.PerformOps(buf, (void *)4); + call_.PerformOps(&buf, (void *)4); GPR_ASSERT(cq_.Pluck((void *)4)); return status; } @@ -167,26 +167,26 @@ class ClientReaderWriter final : public ClientStreamingInterface, // Blocking create a stream. explicit ClientReaderWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context) - : call_(channel->CreateCall(method, context, &cq_), channel) {} + : call_(channel->CreateCall(method, context, &cq_)) {} virtual bool Read(R *msg) { CallOpBuffer buf; buf.AddRecvMessage(msg); - call_.PerformOps(buf, (void *)2); + call_.PerformOps(&buf, (void *)2); return cq_.Pluck((void *)2); } virtual bool Write(const W& msg) { CallOpBuffer buf; buf.AddSendMessage(msg); - call_.PerformOps(buf, (void *)3); + call_.PerformOps(&buf, (void *)3); return cq_.Pluck((void *)3); } virtual bool WritesDone() { CallOpBuffer buf; buf.AddClientSendClose(); - call_.PerformOps(buf, (void *)4); + call_.PerformOps(&buf, (void *)4); return cq_.Pluck((void *)4); } @@ -194,7 +194,7 @@ class ClientReaderWriter final : public ClientStreamingInterface, CallOpBuffer buf; Status status; buf.AddClientRecvStatus(&status); - call_.PerformOps(buf, (void *)5); + call_.PerformOps(&buf, (void *)5); GPR_ASSERT(cq_.Pluck((void *)5)); return status; } diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 2bc1001935c..b5132129033 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -43,8 +43,10 @@ #include "src/cpp/proto/proto_utils.h" #include "src/cpp/stream/stream_context.h" +#include #include #include +#include #include #include #include @@ -77,13 +79,23 @@ Channel::Channel(const grpc::string &target, Channel::~Channel() { grpc_channel_destroy(c_channel_); } -grpc_call *Channel::CreateCall(const RpcMethod &method, ClientContext *context, - CompletionQueue *cq) { +Call Channel::CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) { auto c_call = grpc_channel_create_call(c_channel_, cq->cq(), method.name(), target_.c_str(), context->RawDeadline()); context->set_call(c_call); - return c_call; + return Call(c_call, this, cq); +} + +void Channel::PerformOpsOnCall(CallOpBuffer *buf, void *tag, Call *call) { + static const size_t MAX_OPS = 8; + size_t nops = MAX_OPS; + grpc_op ops[MAX_OPS]; + buf->FillOps(ops, &nops); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call->call(), ops, nops, + call->cq()->PrepareTagForC(buf, tag))); } } // namespace grpc diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index 84014b3cea7..6cf222883c2 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -42,6 +42,8 @@ struct grpc_channel; namespace grpc { +class Call; +class CallOpBuffer; class ChannelArguments; class CompletionQueue; class Credentials; @@ -55,8 +57,10 @@ class Channel final : public ChannelInterface { ~Channel() override; - virtual grpc_call *CreateCall(const RpcMethod &method, ClientContext *context, - CompletionQueue *cq); + virtual Call CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) override; + virtual void PerformOpsOnCall(CallOpBuffer *ops, void *tag, + Call *call) override; private: const grpc::string target_; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index f415f7e72ff..6ae6c6cdb8d 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -32,10 +32,11 @@ */ #include +#include namespace grpc { -void Call::PerformOps(const CallOpBuffer& buffer, void* tag) { +void Call::PerformOps(CallOpBuffer* buffer, void* tag) { channel_->PerformOpsOnCall(buffer, tag, call_); } diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index 72edfeb14e4..383b66c519d 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -62,9 +62,10 @@ bool CompletionQueue::Next(void **tag, bool *ok) { if (ev->type == GRPC_QUEUE_SHUTDOWN) { return false; } - std::unique_ptr func(static_cast(ev->tag)); - *tag = (*func)(); - *ok = (ev->data.op_complete == GRPC_OP_OK); + auto cq_tag = static_cast(ev->tag); + cq_tag->FinalizeResult(); + *tag = cq_tag->user_tag_; + *ok = ev->status.op_complete == GRPC_OP_OK; return true; }