diff --git a/build.json b/build.json index 68110e47020..7d35f79af09 100644 --- a/build.json +++ b/build.json @@ -413,6 +413,7 @@ "src/cpp/client/create_channel.cc", "src/cpp/client/credentials.cc", "src/cpp/client/internal_stub.cc", + "src/cpp/common/call.cc", "src/cpp/common/completion_queue.cc", "src/cpp/common/rpc_method.cc", "src/cpp/proto/proto_utils.cc", diff --git a/include/grpc++/call.h b/include/grpc++/call.h new file mode 100644 index 00000000000..704cd47daec --- /dev/null +++ b/include/grpc++/call.h @@ -0,0 +1,80 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_CALL_H__ +#define __GRPCPP_CALL_H__ + +#include +#include + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +struct grpc_call; + +namespace grpc { + +class ChannelInterface; + +class CallOpBuffer final { + public: + void AddSendMessage(const google::protobuf::Message &message); + void AddRecvMessage(google::protobuf::Message *message); + void AddClientSendClose(); + void AddClientRecvStatus(Status *status); + + void FinalizeResult(); + + private: + static const size_t MAX_OPS = 6; + grpc_op ops_[MAX_OPS]; + int num_ops_ = 0; +}; + +// Straightforward wrapping of the C call object +class Call final { + public: + Call(grpc_call *call, ChannelInterface *channel); + + void PerformOps(const CallOpBuffer &buffer, void *tag); + + private: + ChannelInterface *const channel_; +}; + +} // namespace grpc + +#endif // __GRPCPP_CALL_INTERFACE_H__ diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h index 9ed35422b85..4f1e1911e6a 100644 --- a/include/grpc++/channel_interface.h +++ b/include/grpc++/channel_interface.h @@ -39,30 +39,40 @@ namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google + +struct grpc_call; namespace grpc { class ClientContext; +class CompletionQueue; class RpcMethod; class StreamContextInterface; +class CallInterface; class ChannelInterface { public: virtual ~ChannelInterface() {} - virtual Status StartBlockingRpc(const RpcMethod& method, - ClientContext* context, - const google::protobuf::Message& request, - google::protobuf::Message* result) = 0; - - virtual StreamContextInterface* CreateStream( - const RpcMethod& method, ClientContext* context, - const google::protobuf::Message* request, - google::protobuf::Message* result) = 0; + virtual grpc_call *CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq); }; +// Wrapper that begins an asynchronous unary call +void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result, Status *status, + CompletionQueue *cq, void *tag); + +// Wrapper that performs a blocking unary call +Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result); + } // namespace grpc #endif // __GRPCPP_CHANNEL_INTERFACE_H__ diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 4e8c1071c03..a60d27f4382 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -49,7 +49,9 @@ class CompletionQueue { // Blocking read from queue. // Returns true if an event was received, false if the queue is ready // for destruction. - bool Next(void** tag); + bool Next(void **tag, bool *ok); + + bool Pluck(void *tag); // Prepare a tag for the C api // Given a tag we'd like to receive from Next, what tag should we pass diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 5fa371ba626..3931d9a1bce 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -48,8 +48,8 @@ struct grpc_server; namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google namespace grpc { class AsyncServerContext; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index b8982f4d93d..5c538431f54 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -34,6 +34,9 @@ #ifndef __GRPCPP_STREAM_H__ #define __GRPCPP_STREAM_H__ +#include +#include +#include #include #include #include @@ -45,16 +48,12 @@ class ClientStreamingInterface { public: virtual ~ClientStreamingInterface() {} - // Try to cancel the stream. Wait() still needs to be called to get the final - // status. Cancelling after the stream has finished has no effects. - virtual void Cancel() = 0; - // Wait until the stream finishes, and return the final status. When the // client side declares it has no more message to send, either implicitly or // by calling WritesDone, it needs to make sure there is no more message to // be received from the server, either implicitly or by getting a false from // a Read(). Otherwise, this implicitly cancels the stream. - virtual const Status& Wait() = 0; + virtual Status Finish() = 0; }; // An interface that yields a sequence of R messages. @@ -82,95 +81,127 @@ class WriterInterface { }; template -class ClientReader : public ClientStreamingInterface, - public ReaderInterface { +class ClientReader final : public ClientStreamingInterface, + public ReaderInterface { public: // Blocking create a stream and write the first request out. - explicit ClientReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Write(context_->request(), true); + explicit ClientReader(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request) + : call_(channel->CreateCall(method, context, &cq_), channel) { + CallOpBuffer buf; + buf.AddSendMessage(request); + buf.AddClientSendClose(); + call_.PerformOps(buf, (void *)1); + cq_.Pluck((void *)1); } - ~ClientReader() { delete context_; } - - virtual bool Read(R* msg) { return context_->Read(msg); } - - virtual void Cancel() { context_->Cancel(); } + virtual bool Read(R *msg) { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(buf, (void *)2); + return cq_.Pluck((void *)2); + } - virtual const Status& Wait() { return context_->Wait(); } + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&status); + call_.PerformOps(buf, (void *)3); + GPR_ASSERT(cq_.Pluck((void *)3)); + return status; + } private: - StreamContextInterface* const context_; + CompletionQueue cq_; + Call call_; }; template -class ClientWriter : public ClientStreamingInterface, - public WriterInterface { +class ClientWriter final : public ClientStreamingInterface, + public WriterInterface { public: // Blocking create a stream. - explicit ClientWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(false); - } - - ~ClientWriter() { delete context_; } + explicit ClientWriter(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + google::protobuf::Message *response) + : response_(response), + call_(channel->CreateCall(method, context, &cq_), channel) {} virtual bool Write(const W& msg) { - return context_->Write(const_cast(&msg), false); + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(buf, (void *)2); + return cq_.Pluck((void *)2); } - virtual void WritesDone() { context_->Write(nullptr, true); } - - virtual void Cancel() { context_->Cancel(); } + virtual bool WritesDone() { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(buf, (void *)3); + return cq_.Pluck((void *)3); + } // Read the final response and wait for the final status. - virtual const Status& Wait() { - bool success = context_->Read(context_->response()); - if (!success) { - Cancel(); - } else { - success = context_->Read(nullptr); - if (success) { - Cancel(); - } - } - return context_->Wait(); + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&status); + call_.PerformOps(buf, (void *)4); + GPR_ASSERT(cq_.Pluck((void *)4)); + return status; } private: - StreamContextInterface* const context_; + google::protobuf::Message *const response_; + CompletionQueue cq_; + Call call_; }; // Client-side interface for bi-directional streaming. template -class ClientReaderWriter : public ClientStreamingInterface, - public WriterInterface, - public ReaderInterface { +class ClientReaderWriter final : public ClientStreamingInterface, + public WriterInterface, + public ReaderInterface { public: // Blocking create a stream. - explicit ClientReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(false); + explicit ClientReaderWriter(ChannelInterface *channel, + const RpcMethod &method, ClientContext *context) + : call_(channel->CreateCall(method, context, &cq_), channel) {} + + virtual bool Read(R *msg) { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(buf, (void *)2); + return cq_.Pluck((void *)2); } - ~ClientReaderWriter() { delete context_; } - - virtual bool Read(R* msg) { return context_->Read(msg); } - virtual bool Write(const W& msg) { - return context_->Write(const_cast(&msg), false); + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(buf, (void *)3); + return cq_.Pluck((void *)3); } - virtual void WritesDone() { context_->Write(nullptr, true); } - - virtual void Cancel() { context_->Cancel(); } + virtual bool WritesDone() { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(buf, (void *)4); + return cq_.Pluck((void *)4); + } - virtual const Status& Wait() { return context_->Wait(); } + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&status); + call_.PerformOps(buf, (void *)5); + GPR_ASSERT(cq_.Pluck((void *)5)); + return status; + } private: - StreamContextInterface* const context_; + CompletionQueue cq_; + Call call_; }; template diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 8724f97e8be..cd537f9e8c3 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -268,7 +268,7 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) {\n"); printer->Print(*vars, - " return channel()->StartBlockingRpc(" + "return ::grpc::BlockingUnaryCall(channel()," "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\"), " "context, request, response);\n" "}\n\n"); @@ -279,10 +279,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "::grpc::ClientContext* context, $Response$* response) {\n"); printer->Print(*vars, " return new ::grpc::ClientWriter< $Request$>(" - "channel()->CreateStream(" + "channel()," "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), " - "context, nullptr, response));\n" + "context, response);\n" "}\n\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( @@ -291,10 +291,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "::grpc::ClientContext* context, const $Request$* request) {\n"); printer->Print(*vars, " return new ::grpc::ClientReader< $Response$>(" - "channel()->CreateStream(" + "channel()," "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod::RpcType::SERVER_STREAMING), " - "context, request, nullptr));\n" + "context, *request);\n" "}\n\n"); } else if (BidiStreaming(method)) { printer->Print( @@ -304,10 +304,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, printer->Print( *vars, " return new ::grpc::ClientReaderWriter< $Request$, $Response$>(" - "channel()->CreateStream(" + "channel()," "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod::RpcType::BIDI_STREAMING), " - "context, nullptr, nullptr));\n" + "context);\n" "}\n\n"); } } diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 3f39364bda2..2bc1001935c 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -77,103 +77,13 @@ Channel::Channel(const grpc::string &target, Channel::~Channel() { grpc_channel_destroy(c_channel_); } -namespace { -// Pluck the finished event and set to status when it is not nullptr. -void GetFinalStatus(grpc_completion_queue *cq, void *finished_tag, - Status *status) { - grpc_event *ev = - grpc_completion_queue_pluck(cq, finished_tag, gpr_inf_future); - if (status) { - StatusCode error_code = static_cast(ev->data.finished.status); - grpc::string details(ev->data.finished.details ? ev->data.finished.details - : ""); - *status = Status(error_code, details); - } - grpc_event_finish(ev); -} -} // namespace - -// TODO(yangg) more error handling -Status Channel::StartBlockingRpc(const RpcMethod &method, - ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result) { - Status status; - grpc_call *call = grpc_channel_create_call_old( - c_channel_, method.name(), target_.c_str(), context->RawDeadline()); - context->set_call(call); - - grpc_event *ev; - void *finished_tag = reinterpret_cast(call); - void *metadata_read_tag = reinterpret_cast(call) + 2; - void *write_tag = reinterpret_cast(call) + 3; - void *halfclose_tag = reinterpret_cast(call) + 4; - void *read_tag = reinterpret_cast(call) + 5; - - grpc_completion_queue *cq = grpc_completion_queue_create(); - context->set_cq(cq); - // add_metadata from context - // - // invoke - GPR_ASSERT(grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag, - GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); - // write request - grpc_byte_buffer *write_buffer = nullptr; - bool success = SerializeProto(request, &write_buffer); - if (!success) { - grpc_call_cancel(call); - status = - Status(StatusCode::DATA_LOSS, "Failed to serialize request proto."); - GetFinalStatus(cq, finished_tag, nullptr); - return status; - } - GPR_ASSERT(grpc_call_start_write_old(call, write_buffer, write_tag, - GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); - grpc_byte_buffer_destroy(write_buffer); - ev = grpc_completion_queue_pluck(cq, write_tag, gpr_inf_future); - - success = ev->data.write_accepted == GRPC_OP_OK; - grpc_event_finish(ev); - if (!success) { - GetFinalStatus(cq, finished_tag, &status); - return status; - } - // writes done - GPR_ASSERT(grpc_call_writes_done_old(call, halfclose_tag) == GRPC_CALL_OK); - ev = grpc_completion_queue_pluck(cq, halfclose_tag, gpr_inf_future); - grpc_event_finish(ev); - // start read metadata - // - ev = grpc_completion_queue_pluck(cq, metadata_read_tag, gpr_inf_future); - grpc_event_finish(ev); - // start read - GPR_ASSERT(grpc_call_start_read_old(call, read_tag) == GRPC_CALL_OK); - ev = grpc_completion_queue_pluck(cq, read_tag, gpr_inf_future); - if (ev->data.read) { - if (!DeserializeProto(ev->data.read, result)) { - grpc_event_finish(ev); - status = Status(StatusCode::DATA_LOSS, "Failed to parse response proto."); - GetFinalStatus(cq, finished_tag, nullptr); - return status; - } - } - grpc_event_finish(ev); - - // wait status - GetFinalStatus(cq, finished_tag, &status); - return status; -} - -StreamContextInterface *Channel::CreateStream( - const RpcMethod &method, ClientContext *context, - const google::protobuf::Message *request, - google::protobuf::Message *result) { - grpc_call *call = grpc_channel_create_call_old( - c_channel_, method.name(), target_.c_str(), context->RawDeadline()); - context->set_call(call); - grpc_completion_queue *cq = grpc_completion_queue_create(); - context->set_cq(cq); - return new StreamContext(method, context, request, result); +grpc_call *Channel::CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) { + auto c_call = + grpc_channel_create_call(c_channel_, cq->cq(), method.name(), + target_.c_str(), context->RawDeadline()); + context->set_call(c_call); + return c_call; } } // namespace grpc diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index 67d18bf4c89..84014b3cea7 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -43,10 +43,11 @@ struct grpc_channel; namespace grpc { class ChannelArguments; +class CompletionQueue; class Credentials; class StreamContextInterface; -class Channel : public ChannelInterface { +class Channel final : public ChannelInterface { public: Channel(const grpc::string &target, const ChannelArguments &args); Channel(const grpc::string &target, const std::unique_ptr &creds, @@ -54,14 +55,8 @@ class Channel : public ChannelInterface { ~Channel() override; - Status StartBlockingRpc(const RpcMethod &method, ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result) override; - - StreamContextInterface *CreateStream( - const RpcMethod &method, ClientContext *context, - const google::protobuf::Message *request, - google::protobuf::Message *result) override; + virtual grpc_call *CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq); private: const grpc::string target_; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc new file mode 100644 index 00000000000..f415f7e72ff --- /dev/null +++ b/src/cpp/common/call.cc @@ -0,0 +1,42 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +namespace grpc { + +void Call::PerformOps(const CallOpBuffer& buffer, void* tag) { + channel_->PerformOpsOnCall(buffer, tag, call_); +} + +} // namespace grpc diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index a1a858ae2e0..72edfeb14e4 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -55,19 +55,16 @@ class EventDeleter { void operator()(grpc_event *ev) { if (ev) grpc_event_finish(ev); } }; -bool CompletionQueue::Next(void **tag) { +bool CompletionQueue::Next(void **tag, bool *ok) { std::unique_ptr ev; ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future)); - if (!ev) { - gpr_log(GPR_ERROR, "no next event in queue"); - abort(); - } if (ev->type == GRPC_QUEUE_SHUTDOWN) { return false; } std::unique_ptr func(static_cast(ev->tag)); *tag = (*func)(); + *ok = (ev->data.op_complete == GRPC_OP_OK); return true; } diff --git a/src/cpp/server/server_rpc_handler.h b/src/cpp/server/server_rpc_handler.h index a43e07dc5f9..ec8ec2c330b 100644 --- a/src/cpp/server/server_rpc_handler.h +++ b/src/cpp/server/server_rpc_handler.h @@ -53,7 +53,6 @@ class ServerRpcHandler { void StartRpc(); private: - CompletionQueue::CompletionType WaitForNextEvent(); void FinishRpc(const Status &status); std::unique_ptr async_server_context_;