Merge branch 'c++api' of github.com:ctiller/grpc into c++api

pull/501/head
Yang Gao 10 years ago
commit 76bacbf358
  1. 3
      Makefile
  2. 1
      build.json
  3. 96
      include/grpc++/call.h
  4. 36
      include/grpc++/channel_interface.h
  5. 58
      include/grpc++/completion_queue.h
  6. 3
      include/grpc++/config.h
  7. 4
      include/grpc++/server.h
  8. 216
      include/grpc++/stream.h
  9. 142
      src/compiler/cpp_generator.cc
  10. 112
      src/cpp/client/channel.cc
  11. 17
      src/cpp/client/channel.h
  12. 43
      src/cpp/common/call.cc
  13. 77
      src/cpp/common/completion_queue.cc
  14. 1
      src/cpp/server/server_rpc_handler.h

@ -2614,6 +2614,7 @@ LIBGRPC++_SRC = \
src/cpp/client/create_channel.cc \ src/cpp/client/create_channel.cc \
src/cpp/client/credentials.cc \ src/cpp/client/credentials.cc \
src/cpp/client/internal_stub.cc \ src/cpp/client/internal_stub.cc \
src/cpp/common/call.cc \
src/cpp/common/completion_queue.cc \ src/cpp/common/completion_queue.cc \
src/cpp/common/rpc_method.cc \ src/cpp/common/rpc_method.cc \
src/cpp/proto/proto_utils.cc \ src/cpp/proto/proto_utils.cc \
@ -2673,6 +2674,7 @@ src/cpp/client/client_context.cc: $(OPENSSL_DEP)
src/cpp/client/create_channel.cc: $(OPENSSL_DEP) src/cpp/client/create_channel.cc: $(OPENSSL_DEP)
src/cpp/client/credentials.cc: $(OPENSSL_DEP) src/cpp/client/credentials.cc: $(OPENSSL_DEP)
src/cpp/client/internal_stub.cc: $(OPENSSL_DEP) src/cpp/client/internal_stub.cc: $(OPENSSL_DEP)
src/cpp/common/call.cc: $(OPENSSL_DEP)
src/cpp/common/completion_queue.cc: $(OPENSSL_DEP) src/cpp/common/completion_queue.cc: $(OPENSSL_DEP)
src/cpp/common/rpc_method.cc: $(OPENSSL_DEP) src/cpp/common/rpc_method.cc: $(OPENSSL_DEP)
src/cpp/proto/proto_utils.cc: $(OPENSSL_DEP) src/cpp/proto/proto_utils.cc: $(OPENSSL_DEP)
@ -2733,6 +2735,7 @@ objs/$(CONFIG)/src/cpp/client/client_context.o:
objs/$(CONFIG)/src/cpp/client/create_channel.o: objs/$(CONFIG)/src/cpp/client/create_channel.o:
objs/$(CONFIG)/src/cpp/client/credentials.o: objs/$(CONFIG)/src/cpp/client/credentials.o:
objs/$(CONFIG)/src/cpp/client/internal_stub.o: objs/$(CONFIG)/src/cpp/client/internal_stub.o:
objs/$(CONFIG)/src/cpp/common/call.o:
objs/$(CONFIG)/src/cpp/common/completion_queue.o: objs/$(CONFIG)/src/cpp/common/completion_queue.o:
objs/$(CONFIG)/src/cpp/common/rpc_method.o: objs/$(CONFIG)/src/cpp/common/rpc_method.o:
objs/$(CONFIG)/src/cpp/proto/proto_utils.o: objs/$(CONFIG)/src/cpp/proto/proto_utils.o:

@ -413,6 +413,7 @@
"src/cpp/client/create_channel.cc", "src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc", "src/cpp/client/credentials.cc",
"src/cpp/client/internal_stub.cc", "src/cpp/client/internal_stub.cc",
"src/cpp/common/call.cc",
"src/cpp/common/completion_queue.cc", "src/cpp/common/completion_queue.cc",
"src/cpp/common/rpc_method.cc", "src/cpp/common/rpc_method.cc",
"src/cpp/proto/proto_utils.cc", "src/cpp/proto/proto_utils.cc",

@ -0,0 +1,96 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPCPP_CALL_H__
#define __GRPCPP_CALL_H__
#include <grpc++/status.h>
#include <grpc++/completion_queue.h>
#include <memory>
#include <vector>
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
struct grpc_call;
struct grpc_op;
namespace grpc {
class ChannelInterface;
class CallOpBuffer final : public CompletionQueueTag {
public:
void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata);
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose();
void AddClientRecvStatus(Status *status);
// INTERNAL API:
// Convert to an array of grpc_op elements
void FillOps(grpc_op *ops, size_t *nops);
// Called by completion queue just prior to returning from Next() or Pluck()
void FinalizeResult() override;
};
class CCallDeleter {
public:
void operator()(grpc_call *c);
};
// Straightforward wrapping of the C call object
class Call final {
public:
Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq);
void PerformOps(CallOpBuffer *buffer, void *tag);
grpc_call *call() { return call_.get(); }
CompletionQueue *cq() { return cq_; }
private:
ChannelInterface *channel_;
CompletionQueue *cq_;
std::unique_ptr<grpc_call, CCallDeleter> call_;
};
} // namespace grpc
#endif // __GRPCPP_CALL_INTERFACE_H__

@ -39,30 +39,42 @@
namespace google { namespace google {
namespace protobuf { namespace protobuf {
class Message; class Message;
} } // namespace protobuf
} } // namespace google
namespace grpc { struct grpc_call;
namespace grpc {
class Call;
class CallOpBuffer;
class ClientContext; class ClientContext;
class CompletionQueue;
class RpcMethod; class RpcMethod;
class StreamContextInterface; class StreamContextInterface;
class CallInterface;
class ChannelInterface { class ChannelInterface {
public: public:
virtual ~ChannelInterface() {} virtual ~ChannelInterface() {}
virtual Status StartBlockingRpc(const RpcMethod& method, virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
ClientContext* context, CompletionQueue *cq) = 0;
const google::protobuf::Message& request, virtual void PerformOpsOnCall(CallOpBuffer *ops, void *tag, Call *call) = 0;
google::protobuf::Message* result) = 0;
virtual StreamContextInterface* CreateStream(
const RpcMethod& method, ClientContext* context,
const google::protobuf::Message* request,
google::protobuf::Message* result) = 0;
}; };
// Wrapper that begins an asynchronous unary call
void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result, Status *status,
CompletionQueue *cq, void *tag);
// Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result);
} // namespace grpc } // namespace grpc
#endif // __GRPCPP_CHANNEL_INTERFACE_H__ #endif // __GRPCPP_CHANNEL_INTERFACE_H__

@ -38,42 +38,46 @@ struct grpc_completion_queue;
namespace grpc { namespace grpc {
class CompletionQueue;
class CompletionQueueTag {
public:
virtual void FinalizeResult() = 0;
private:
friend class CompletionQueue;
void *user_tag_;
};
// grpc_completion_queue wrapper class // grpc_completion_queue wrapper class
class CompletionQueue { class CompletionQueue {
public: public:
CompletionQueue(); CompletionQueue();
~CompletionQueue(); ~CompletionQueue();
enum CompletionType {
QUEUE_CLOSED = 0, // Shutting down.
RPC_END = 1, // An RPC finished. Either at client or server.
CLIENT_READ_OK = 2, // A client-side read has finished successfully.
CLIENT_READ_ERROR = 3, // A client-side read has finished with error.
CLIENT_WRITE_OK = 4,
CLIENT_WRITE_ERROR = 5,
SERVER_RPC_NEW = 6, // A new RPC just arrived at the server.
SERVER_READ_OK = 7, // A server-side read has finished successfully.
SERVER_READ_ERROR = 8, // A server-side read has finished with error.
SERVER_WRITE_OK = 9,
SERVER_WRITE_ERROR = 10,
// Client or server has sent half close successfully.
HALFCLOSE_OK = 11,
// New CompletionTypes may be added in the future, so user code should
// always
// handle the default case of a CompletionType that appears after such code
// was
// written.
DO_NOT_USE = 20,
};
// Blocking read from queue. // Blocking read from queue.
// For QUEUE_CLOSED, *tag is not changed. // Returns true if an event was received, false if the queue is ready
// For SERVER_RPC_NEW, *tag will be a newly allocated AsyncServerContext. // for destruction.
// For others, *tag will be the AsyncServerContext of this rpc. bool Next(void **tag, bool *ok);
CompletionType Next(void** tag);
bool Pluck(void *tag);
// Prepare a tag for the C api
// Given a tag we'd like to receive from Next, what tag should we pass
// down to the C api?
// Usage example:
// grpc_call_start_batch(..., cq.PrepareTagForC(tag));
// Allows attaching some work to be executed before the original tag
// is returned.
// MUST be used for all events that could be surfaced through this
// wrapping API
void *PrepareTagForC(CompletionQueueTag *cq_tag, void *user_tag) {
cq_tag->user_tag_ = user_tag;
return cq_tag;
}
// Shutdown has to be called, and the CompletionQueue can only be // Shutdown has to be called, and the CompletionQueue can only be
// destructed when the QUEUE_CLOSED message has been read with Next(). // destructed when false is returned from Next().
void Shutdown(); void Shutdown();
grpc_completion_queue* cq() { return cq_; } grpc_completion_queue* cq() { return cq_; }

@ -39,6 +39,7 @@
namespace grpc { namespace grpc {
typedef std::string string; typedef std::string string;
}
} // namespace grpc
#endif // __GRPCPP_CONFIG_H__ #endif // __GRPCPP_CONFIG_H__

@ -48,8 +48,8 @@ struct grpc_server;
namespace google { namespace google {
namespace protobuf { namespace protobuf {
class Message; class Message;
} } // namespace protobuf
} } // namespace google
namespace grpc { namespace grpc {
class AsyncServerContext; class AsyncServerContext;

@ -34,6 +34,9 @@
#ifndef __GRPCPP_STREAM_H__ #ifndef __GRPCPP_STREAM_H__
#define __GRPCPP_STREAM_H__ #define __GRPCPP_STREAM_H__
#include <grpc++/call.h>
#include <grpc++/channel_interface.h>
#include <grpc++/completion_queue.h>
#include <grpc++/stream_context_interface.h> #include <grpc++/stream_context_interface.h>
#include <grpc++/status.h> #include <grpc++/status.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
@ -45,16 +48,12 @@ class ClientStreamingInterface {
public: public:
virtual ~ClientStreamingInterface() {} virtual ~ClientStreamingInterface() {}
// Try to cancel the stream. Wait() still needs to be called to get the final
// status. Cancelling after the stream has finished has no effects.
virtual void Cancel() = 0;
// Wait until the stream finishes, and return the final status. When the // Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or // client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to // by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from // be received from the server, either implicitly or by getting a false from
// a Read(). Otherwise, this implicitly cancels the stream. // a Read(). Otherwise, this implicitly cancels the stream.
virtual const Status& Wait() = 0; virtual Status Finish() = 0;
}; };
// An interface that yields a sequence of R messages. // An interface that yields a sequence of R messages.
@ -82,101 +81,202 @@ class WriterInterface {
}; };
template <class R> template <class R>
class ClientReader : public ClientStreamingInterface, class ClientReader final : public ClientStreamingInterface,
public ReaderInterface<R> { public ReaderInterface<R> {
public: public:
// Blocking create a stream and write the first request out. // Blocking create a stream and write the first request out.
explicit ClientReader(StreamContextInterface* context) : context_(context) { explicit ClientReader(ChannelInterface *channel, const RpcMethod &method,
GPR_ASSERT(context_); ClientContext *context,
context_->Start(true); const google::protobuf::Message &request)
context_->Write(context_->request(), true); : call_(channel->CreateCall(method, context, &cq_)) {
CallOpBuffer buf;
buf.AddSendMessage(request);
buf.AddClientSendClose();
call_.PerformOps(&buf, (void *)1);
cq_.Pluck((void *)1);
} }
~ClientReader() { delete context_; } virtual bool Read(R *msg) {
CallOpBuffer buf;
virtual bool Read(R* msg) { return context_->Read(msg); } buf.AddRecvMessage(msg);
call_.PerformOps(&buf, (void *)2);
virtual void Cancel() { context_->Cancel(); } return cq_.Pluck((void *)2);
}
virtual const Status& Wait() { return context_->Wait(); } virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&status);
call_.PerformOps(&buf, (void *)3);
GPR_ASSERT(cq_.Pluck((void *)3));
return status;
}
private: private:
StreamContextInterface* const context_; CompletionQueue cq_;
Call call_;
}; };
template <class W> template <class W>
class ClientWriter : public ClientStreamingInterface, class ClientWriter final : public ClientStreamingInterface,
public WriterInterface<W> { public WriterInterface<W> {
public: public:
// Blocking create a stream. // Blocking create a stream.
explicit ClientWriter(StreamContextInterface* context) : context_(context) { explicit ClientWriter(ChannelInterface *channel, const RpcMethod &method,
GPR_ASSERT(context_); ClientContext *context,
context_->Start(false); google::protobuf::Message *response)
: response_(response),
call_(channel->CreateCall(method, context, &cq_)) {}
virtual bool Write(const W& msg) {
CallOpBuffer buf;
buf.AddSendMessage(msg);
call_.PerformOps(&buf, (void *)2);
return cq_.Pluck((void *)2);
}
virtual bool WritesDone() {
CallOpBuffer buf;
buf.AddClientSendClose();
call_.PerformOps(&buf, (void *)3);
return cq_.Pluck((void *)3);
}
// Read the final response and wait for the final status.
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&status);
call_.PerformOps(&buf, (void *)4);
GPR_ASSERT(cq_.Pluck((void *)4));
return status;
} }
~ClientWriter() { delete context_; } private:
google::protobuf::Message *const response_;
CompletionQueue cq_;
Call call_;
};
// Client-side interface for bi-directional streaming.
template <class W, class R>
class ClientReaderWriter final : public ClientStreamingInterface,
public WriterInterface<W>,
public ReaderInterface<R> {
public:
// Blocking create a stream.
explicit ClientReaderWriter(ChannelInterface *channel,
const RpcMethod &method, ClientContext *context)
: call_(channel->CreateCall(method, context, &cq_)) {}
virtual bool Read(R *msg) {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
call_.PerformOps(&buf, (void *)2);
return cq_.Pluck((void *)2);
}
virtual bool Write(const W& msg) { virtual bool Write(const W& msg) {
return context_->Write(const_cast<W*>(&msg), false); CallOpBuffer buf;
buf.AddSendMessage(msg);
call_.PerformOps(&buf, (void *)3);
return cq_.Pluck((void *)3);
} }
virtual void WritesDone() { context_->Write(nullptr, true); } virtual bool WritesDone() {
CallOpBuffer buf;
buf.AddClientSendClose();
call_.PerformOps(&buf, (void *)4);
return cq_.Pluck((void *)4);
}
virtual void Cancel() { context_->Cancel(); } virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&status);
call_.PerformOps(&buf, (void *)5);
GPR_ASSERT(cq_.Pluck((void *)5));
return status;
}
// Read the final response and wait for the final status. private:
virtual const Status& Wait() { CompletionQueue cq_;
bool success = context_->Read(context_->response()); Call call_;
if (!success) { };
Cancel();
} else { template <class R>
success = context_->Read(nullptr); class ServerReader : public ReaderInterface<R> {
if (success) { public:
Cancel(); explicit ServerReader(StreamContextInterface* context) : context_(context) {
} GPR_ASSERT(context_);
} context_->Start(true);
return context_->Wait();
} }
virtual bool Read(R* msg) { return context_->Read(msg); }
private: private:
StreamContextInterface* const context_; StreamContextInterface* const context_; // not owned
}; };
// Client-side interface for bi-directional streaming. template <class W>
class ServerWriter : public WriterInterface<W> {
public:
explicit ServerWriter(StreamContextInterface* context) : context_(context) {
GPR_ASSERT(context_);
context_->Start(true);
context_->Read(context_->request());
}
virtual bool Write(const W& msg) {
return context_->Write(const_cast<W*>(&msg), false);
}
private:
StreamContextInterface* const context_; // not owned
};
// Server-side interface for bi-directional streaming.
template <class W, class R> template <class W, class R>
class ClientReaderWriter : public ClientStreamingInterface, class ServerReaderWriter : public WriterInterface<W>,
public WriterInterface<W>,
public ReaderInterface<R> { public ReaderInterface<R> {
public: public:
// Blocking create a stream. explicit ServerReaderWriter(StreamContextInterface* context)
explicit ClientReaderWriter(StreamContextInterface* context)
: context_(context) { : context_(context) {
GPR_ASSERT(context_); GPR_ASSERT(context_);
context_->Start(false); context_->Start(true);
} }
~ClientReaderWriter() { delete context_; }
virtual bool Read(R* msg) { return context_->Read(msg); } virtual bool Read(R* msg) { return context_->Read(msg); }
virtual bool Write(const W& msg) { virtual bool Write(const W& msg) {
return context_->Write(const_cast<W*>(&msg), false); return context_->Write(const_cast<W*>(&msg), false);
} }
virtual void WritesDone() { context_->Write(nullptr, true); } private:
StreamContextInterface* const context_; // not owned
};
virtual void Cancel() { context_->Cancel(); } template <class W>
class ServerAsyncResponseWriter {
public:
explicit ServerAsyncResponseWriter(StreamContextInterface* context) : context_(context) {
GPR_ASSERT(context_);
context_->Start(true);
context_->Read(context_->request());
}
virtual const Status& Wait() { return context_->Wait(); } virtual bool Write(const W& msg) {
return context_->Write(const_cast<W*>(&msg), false);
}
private: private:
StreamContextInterface* const context_; StreamContextInterface* const context_; // not owned
}; };
template <class R> template <class R>
class ServerReader : public ReaderInterface<R> { class ServerAsyncReader : public ReaderInterface<R> {
public: public:
explicit ServerReader(StreamContextInterface* context) : context_(context) { explicit ServerAsyncReader(StreamContextInterface* context) : context_(context) {
GPR_ASSERT(context_); GPR_ASSERT(context_);
context_->Start(true); context_->Start(true);
} }
@ -188,9 +288,9 @@ class ServerReader : public ReaderInterface<R> {
}; };
template <class W> template <class W>
class ServerWriter : public WriterInterface<W> { class ServerAsyncWriter : public WriterInterface<W> {
public: public:
explicit ServerWriter(StreamContextInterface* context) : context_(context) { explicit ServerAsyncWriter(StreamContextInterface* context) : context_(context) {
GPR_ASSERT(context_); GPR_ASSERT(context_);
context_->Start(true); context_->Start(true);
context_->Read(context_->request()); context_->Read(context_->request());
@ -206,10 +306,10 @@ class ServerWriter : public WriterInterface<W> {
// Server-side interface for bi-directional streaming. // Server-side interface for bi-directional streaming.
template <class W, class R> template <class W, class R>
class ServerReaderWriter : public WriterInterface<W>, class ServerAsyncReaderWriter : public WriterInterface<W>,
public ReaderInterface<R> { public ReaderInterface<R> {
public: public:
explicit ServerReaderWriter(StreamContextInterface* context) explicit ServerAsyncReaderWriter(StreamContextInterface* context)
: context_(context) { : context_(context) {
GPR_ASSERT(context_); GPR_ASSERT(context_);
context_->Start(true); context_->Start(true);

@ -61,6 +61,17 @@ bool BidiStreaming(const google::protobuf::MethodDescriptor *method) {
return method->client_streaming() && method->server_streaming(); return method->client_streaming() && method->server_streaming();
} }
bool HasUnaryCalls(const google::protobuf::FileDescriptor *file) {
for (int i = 0; i < file->service_count(); i++) {
for (int j = 0; j < file->service(i)->method_count(); j++) {
if (NoStreaming(file->service(i)->method(j))) {
return true;
}
}
}
return false;
}
bool HasClientOnlyStreaming(const google::protobuf::FileDescriptor *file) { bool HasClientOnlyStreaming(const google::protobuf::FileDescriptor *file) {
for (int i = 0; i < file->service_count(); i++) { for (int i = 0; i < file->service_count(); i++) {
for (int j = 0; j < file->service(i)->method_count(); j++) { for (int j = 0; j < file->service(i)->method_count(); j++) {
@ -104,13 +115,21 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
"class ChannelInterface;\n" "class ChannelInterface;\n"
"class RpcService;\n" "class RpcService;\n"
"class ServerContext;\n"; "class ServerContext;\n";
if (HasUnaryCalls(file)) {
temp.append(
"template <class OutMessage> class ServerAsyncResponseWriter;\n");
}
if (HasClientOnlyStreaming(file)) { if (HasClientOnlyStreaming(file)) {
temp.append("template <class OutMessage> class ClientWriter;\n"); temp.append("template <class OutMessage> class ClientWriter;\n");
temp.append("template <class InMessage> class ServerReader;\n"); temp.append("template <class InMessage> class ServerReader;\n");
temp.append("template <class OutMessage> class ClientAsyncWriter;\n");
temp.append("template <class InMessage> class ServerAsyncReader;\n");
} }
if (HasServerOnlyStreaming(file)) { if (HasServerOnlyStreaming(file)) {
temp.append("template <class InMessage> class ClientReader;\n"); temp.append("template <class InMessage> class ClientReader;\n");
temp.append("template <class OutMessage> class ServerWriter;\n"); temp.append("template <class OutMessage> class ServerWriter;\n");
temp.append("template <class OutMessage> class ClientAsyncReader;\n");
temp.append("template <class InMessage> class ServerAsyncWriter;\n");
} }
if (HasBidiStreaming(file)) { if (HasBidiStreaming(file)) {
temp.append( temp.append(
@ -125,10 +144,10 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
} }
std::string GetSourceIncludes() { std::string GetSourceIncludes() {
return "#include \"grpc++/channel_interface.h\"\n" return "#include <grpc++/channel_interface.h>\n"
"#include \"grpc++/impl/rpc_method.h\"\n" "#include <grpc++/impl/rpc_method.h>\n"
"#include \"grpc++/impl/rpc_service_method.h\"\n" "#include <grpc++/impl/rpc_service_method.h>\n"
"#include \"grpc++/stream.h\"\n"; "#include <grpc++/stream.h>\n";
} }
void PrintHeaderClientMethod(google::protobuf::io::Printer *printer, void PrintHeaderClientMethod(google::protobuf::io::Printer *printer,
@ -142,27 +161,45 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer,
if (NoStreaming(method)) { if (NoStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"::grpc::Status $Method$(::grpc::ClientContext* context, " "::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response);\n\n"); "const $Request$& request, $Response$* response);\n");
printer->Print(*vars,
"void $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response, "
"Status *status, "
"CompletionQueue *cq, void *tag);\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(*vars,
*vars, "::grpc::ClientWriter< $Request$>* $Method$("
"::grpc::ClientWriter< $Request$>* $Method$(" "::grpc::ClientContext* context, $Response$* response);\n");
"::grpc::ClientContext* context, $Response$* response);\n\n"); printer->Print(*vars,
"::grpc::ClientWriter< $Request$>* $Method$("
"::grpc::ClientContext* context, $Response$* response, "
"Status *status, "
"CompletionQueue *cq, void *tag);\n");
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
"::grpc::ClientReader< $Response$>* $Method$(" "::grpc::ClientReader< $Response$>* $Method$("
"::grpc::ClientContext* context, const $Request$* request);\n\n"); "::grpc::ClientContext* context, const $Request$* request);\n");
printer->Print(*vars,
"::grpc::ClientReader< $Response$>* $Method$("
"::grpc::ClientContext* context, const $Request$* request, "
"CompletionQueue *cq, void *tag);\n");
} else if (BidiStreaming(method)) { } else if (BidiStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"::grpc::ClientReaderWriter< $Request$, $Response$>* " "::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$Method$(::grpc::ClientContext* context);\n\n"); "$Method$(::grpc::ClientContext* context);\n");
printer->Print(*vars,
"::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$Method$(::grpc::ClientContext* context, "
"CompletionQueue *cq, void *tag);\n");
} }
} }
void PrintHeaderServerMethod(google::protobuf::io::Printer *printer, void PrintHeaderServerMethodSync(
const google::protobuf::MethodDescriptor *method, google::protobuf::io::Printer *printer,
std::map<std::string, std::string> *vars) { const google::protobuf::MethodDescriptor *method,
std::map<std::string, std::string> *vars) {
(*vars)["Method"] = method->name(); (*vars)["Method"] = method->name();
(*vars)["Request"] = (*vars)["Request"] =
grpc_cpp_generator::ClassName(method->input_type(), true); grpc_cpp_generator::ClassName(method->input_type(), true);
@ -194,19 +231,57 @@ void PrintHeaderServerMethod(google::protobuf::io::Printer *printer,
} }
} }
void PrintHeaderServerMethodAsync(
google::protobuf::io::Printer *printer,
const google::protobuf::MethodDescriptor *method,
std::map<std::string, std::string> *vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] =
grpc_cpp_generator::ClassName(method->input_type(), true);
(*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) {
printer->Print(*vars,
"void $Method$("
"::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, "
"::grpc::CompletionQueue* cq, void *tag);\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"void $Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReader< $Request$>* reader, "
"$Response$* response, "
"::grpc::CompletionQueue* cq, void *tag);\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(*vars,
"void $Method$("
"::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncWriter< $Response$>* writer, "
"::grpc::CompletionQueue* cq, void *tag);\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
"void $Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* cq, void *tag);\n");
}
}
void PrintHeaderService(google::protobuf::io::Printer *printer, void PrintHeaderService(google::protobuf::io::Printer *printer,
const google::protobuf::ServiceDescriptor *service, const google::protobuf::ServiceDescriptor *service,
std::map<std::string, std::string> *vars) { std::map<std::string, std::string> *vars) {
(*vars)["Service"] = service->name(); (*vars)["Service"] = service->name();
printer->Print(*vars, printer->Print(*vars,
"class $Service$ {\n" "class $Service$ final {\n"
" public:\n"); " public:\n");
printer->Indent(); printer->Indent();
// Client side // Client side
printer->Print( printer->Print(
"class Stub : public ::grpc::InternalStub {\n" "class Stub final : public ::grpc::InternalStub {\n"
" public:\n"); " public:\n");
printer->Indent(); printer->Indent();
for (int i = 0; i < service->method_count(); ++i) { for (int i = 0; i < service->method_count(); ++i) {
@ -220,7 +295,7 @@ void PrintHeaderService(google::protobuf::io::Printer *printer,
printer->Print("\n"); printer->Print("\n");
// Server side // Server side - Synchronous
printer->Print( printer->Print(
"class Service {\n" "class Service {\n"
" public:\n"); " public:\n");
@ -228,7 +303,24 @@ void PrintHeaderService(google::protobuf::io::Printer *printer,
printer->Print("Service() : service_(nullptr) {}\n"); printer->Print("Service() : service_(nullptr) {}\n");
printer->Print("virtual ~Service();\n"); printer->Print("virtual ~Service();\n");
for (int i = 0; i < service->method_count(); ++i) { for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderServerMethod(printer, service->method(i), vars); PrintHeaderServerMethodSync(printer, service->method(i), vars);
}
printer->Print("::grpc::RpcService* service();\n");
printer->Outdent();
printer->Print(
" private:\n"
" ::grpc::RpcService* service_;\n");
printer->Print("};\n");
// Server side - Asynchronous
printer->Print(
"class AsyncService final {\n"
" public:\n");
printer->Indent();
printer->Print("AsyncService() : service_(nullptr) {}\n");
printer->Print("~AsyncService();\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderServerMethodAsync(printer, service->method(i), vars);
} }
printer->Print("::grpc::RpcService* service();\n"); printer->Print("::grpc::RpcService* service();\n");
printer->Outdent(); printer->Outdent();
@ -268,7 +360,7 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, " "::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) {\n"); "const $Request$& request, $Response$* response) {\n");
printer->Print(*vars, printer->Print(*vars,
" return channel()->StartBlockingRpc(" "return ::grpc::BlockingUnaryCall(channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\"), " "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\"), "
"context, request, response);\n" "context, request, response);\n"
"}\n\n"); "}\n\n");
@ -279,10 +371,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, $Response$* response) {\n"); "::grpc::ClientContext* context, $Response$* response) {\n");
printer->Print(*vars, printer->Print(*vars,
" return new ::grpc::ClientWriter< $Request$>(" " return new ::grpc::ClientWriter< $Request$>("
"channel()->CreateStream(" "channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::CLIENT_STREAMING), " "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
"context, nullptr, response));\n" "context, response);\n"
"}\n\n"); "}\n\n");
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print( printer->Print(
@ -291,10 +383,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, const $Request$* request) {\n"); "::grpc::ClientContext* context, const $Request$* request) {\n");
printer->Print(*vars, printer->Print(*vars,
" return new ::grpc::ClientReader< $Response$>(" " return new ::grpc::ClientReader< $Response$>("
"channel()->CreateStream(" "channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::SERVER_STREAMING), " "::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
"context, request, nullptr));\n" "context, *request);\n"
"}\n\n"); "}\n\n");
} else if (BidiStreaming(method)) { } else if (BidiStreaming(method)) {
printer->Print( printer->Print(
@ -304,10 +396,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
printer->Print( printer->Print(
*vars, *vars,
" return new ::grpc::ClientReaderWriter< $Request$, $Response$>(" " return new ::grpc::ClientReaderWriter< $Request$, $Response$>("
"channel()->CreateStream(" "channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), " "::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
"context, nullptr, nullptr));\n" "context);\n"
"}\n\n"); "}\n\n");
} }
} }

@ -43,8 +43,10 @@
#include "src/cpp/proto/proto_utils.h" #include "src/cpp/proto/proto_utils.h"
#include "src/cpp/stream/stream_context.h" #include "src/cpp/stream/stream_context.h"
#include <grpc++/call.h>
#include <grpc++/channel_arguments.h> #include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h> #include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/config.h> #include <grpc++/config.h>
#include <grpc++/credentials.h> #include <grpc++/credentials.h>
#include <grpc++/impl/rpc_method.h> #include <grpc++/impl/rpc_method.h>
@ -77,103 +79,23 @@ Channel::Channel(const grpc::string &target,
Channel::~Channel() { grpc_channel_destroy(c_channel_); } Channel::~Channel() { grpc_channel_destroy(c_channel_); }
namespace { Call Channel::CreateCall(const RpcMethod &method, ClientContext *context,
// Pluck the finished event and set to status when it is not nullptr. CompletionQueue *cq) {
void GetFinalStatus(grpc_completion_queue *cq, void *finished_tag, auto c_call =
Status *status) { grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
grpc_event *ev = target_.c_str(), context->RawDeadline());
grpc_completion_queue_pluck(cq, finished_tag, gpr_inf_future); context->set_call(c_call);
if (status) { return Call(c_call, this, cq);
StatusCode error_code = static_cast<StatusCode>(ev->data.finished.status);
grpc::string details(ev->data.finished.details ? ev->data.finished.details
: "");
*status = Status(error_code, details);
}
grpc_event_finish(ev);
} }
} // namespace
// TODO(yangg) more error handling void Channel::PerformOpsOnCall(CallOpBuffer *buf, void *tag, Call *call) {
Status Channel::StartBlockingRpc(const RpcMethod &method, static const size_t MAX_OPS = 8;
ClientContext *context, size_t nops = MAX_OPS;
const google::protobuf::Message &request, grpc_op ops[MAX_OPS];
google::protobuf::Message *result) { buf->FillOps(ops, &nops);
Status status; GPR_ASSERT(GRPC_CALL_OK ==
grpc_call *call = grpc_channel_create_call_old( grpc_call_start_batch(call->call(), ops, nops,
c_channel_, method.name(), target_.c_str(), context->RawDeadline()); call->cq()->PrepareTagForC(buf, tag)));
context->set_call(call);
grpc_event *ev;
void *finished_tag = reinterpret_cast<char *>(call);
void *metadata_read_tag = reinterpret_cast<char *>(call) + 2;
void *write_tag = reinterpret_cast<char *>(call) + 3;
void *halfclose_tag = reinterpret_cast<char *>(call) + 4;
void *read_tag = reinterpret_cast<char *>(call) + 5;
grpc_completion_queue *cq = grpc_completion_queue_create();
context->set_cq(cq);
// add_metadata from context
//
// invoke
GPR_ASSERT(grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
// write request
grpc_byte_buffer *write_buffer = nullptr;
bool success = SerializeProto(request, &write_buffer);
if (!success) {
grpc_call_cancel(call);
status =
Status(StatusCode::DATA_LOSS, "Failed to serialize request proto.");
GetFinalStatus(cq, finished_tag, nullptr);
return status;
}
GPR_ASSERT(grpc_call_start_write_old(call, write_buffer, write_tag,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
grpc_byte_buffer_destroy(write_buffer);
ev = grpc_completion_queue_pluck(cq, write_tag, gpr_inf_future);
success = ev->data.write_accepted == GRPC_OP_OK;
grpc_event_finish(ev);
if (!success) {
GetFinalStatus(cq, finished_tag, &status);
return status;
}
// writes done
GPR_ASSERT(grpc_call_writes_done_old(call, halfclose_tag) == GRPC_CALL_OK);
ev = grpc_completion_queue_pluck(cq, halfclose_tag, gpr_inf_future);
grpc_event_finish(ev);
// start read metadata
//
ev = grpc_completion_queue_pluck(cq, metadata_read_tag, gpr_inf_future);
grpc_event_finish(ev);
// start read
GPR_ASSERT(grpc_call_start_read_old(call, read_tag) == GRPC_CALL_OK);
ev = grpc_completion_queue_pluck(cq, read_tag, gpr_inf_future);
if (ev->data.read) {
if (!DeserializeProto(ev->data.read, result)) {
grpc_event_finish(ev);
status = Status(StatusCode::DATA_LOSS, "Failed to parse response proto.");
GetFinalStatus(cq, finished_tag, nullptr);
return status;
}
}
grpc_event_finish(ev);
// wait status
GetFinalStatus(cq, finished_tag, &status);
return status;
}
StreamContextInterface *Channel::CreateStream(
const RpcMethod &method, ClientContext *context,
const google::protobuf::Message *request,
google::protobuf::Message *result) {
grpc_call *call = grpc_channel_create_call_old(
c_channel_, method.name(), target_.c_str(), context->RawDeadline());
context->set_call(call);
grpc_completion_queue *cq = grpc_completion_queue_create();
context->set_cq(cq);
return new StreamContext(method, context, request, result);
} }
} // namespace grpc } // namespace grpc

@ -42,11 +42,14 @@
struct grpc_channel; struct grpc_channel;
namespace grpc { namespace grpc {
class Call;
class CallOpBuffer;
class ChannelArguments; class ChannelArguments;
class CompletionQueue;
class Credentials; class Credentials;
class StreamContextInterface; class StreamContextInterface;
class Channel : public ChannelInterface { class Channel final : public ChannelInterface {
public: public:
Channel(const grpc::string &target, const ChannelArguments &args); Channel(const grpc::string &target, const ChannelArguments &args);
Channel(const grpc::string &target, const std::unique_ptr<Credentials> &creds, Channel(const grpc::string &target, const std::unique_ptr<Credentials> &creds,
@ -54,14 +57,10 @@ class Channel : public ChannelInterface {
~Channel() override; ~Channel() override;
Status StartBlockingRpc(const RpcMethod &method, ClientContext *context, virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
const google::protobuf::Message &request, CompletionQueue *cq) override;
google::protobuf::Message *result) override; virtual void PerformOpsOnCall(CallOpBuffer *ops, void *tag,
Call *call) override;
StreamContextInterface *CreateStream(
const RpcMethod &method, ClientContext *context,
const google::protobuf::Message *request,
google::protobuf::Message *result) override;
private: private:
const grpc::string target_; const grpc::string target_;

@ -0,0 +1,43 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <include/grpc++/call.h>
#include <include/grpc++/channel_interface.h>
namespace grpc {
void Call::PerformOps(CallOpBuffer* buffer, void* tag) {
channel_->PerformOpsOnCall(buffer, tag, this);
}
} // namespace grpc

@ -1,5 +1,4 @@
/* /*
*
* Copyright 2014, Google Inc. * Copyright 2014, Google Inc.
* All rights reserved. * All rights reserved.
* *
@ -33,6 +32,8 @@
#include <grpc++/completion_queue.h> #include <grpc++/completion_queue.h>
#include <memory>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
@ -47,66 +48,24 @@ CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
CompletionQueue::CompletionType CompletionQueue::Next(void **tag) { // Helper class so we can declare a unique_ptr with grpc_event
grpc_event *ev; class EventDeleter {
CompletionType return_type; public:
bool success; void operator()(grpc_event *ev) { if (ev) grpc_event_finish(ev); }
};
ev = grpc_completion_queue_next(cq_, gpr_inf_future); bool CompletionQueue::Next(void **tag, bool *ok) {
if (!ev) { std::unique_ptr<grpc_event, EventDeleter> ev;
gpr_log(GPR_ERROR, "no next event in queue");
abort(); ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
} if (ev->type == GRPC_QUEUE_SHUTDOWN) {
switch (ev->type) { return false;
case GRPC_QUEUE_SHUTDOWN:
return_type = QUEUE_CLOSED;
break;
case GRPC_READ:
*tag = ev->tag;
if (ev->data.read) {
success = static_cast<AsyncServerContext *>(ev->tag)
->ParseRead(ev->data.read);
return_type = success ? SERVER_READ_OK : SERVER_READ_ERROR;
} else {
return_type = SERVER_READ_ERROR;
}
break;
case GRPC_WRITE_ACCEPTED:
*tag = ev->tag;
if (ev->data.write_accepted != GRPC_OP_ERROR) {
return_type = SERVER_WRITE_OK;
} else {
return_type = SERVER_WRITE_ERROR;
}
break;
case GRPC_SERVER_RPC_NEW:
GPR_ASSERT(!ev->tag);
// Finishing the pending new rpcs after the server has been shutdown.
if (!ev->call) {
*tag = nullptr;
} else {
*tag = new AsyncServerContext(
ev->call, ev->data.server_rpc_new.method,
ev->data.server_rpc_new.host,
Timespec2Timepoint(ev->data.server_rpc_new.deadline));
}
return_type = SERVER_RPC_NEW;
break;
case GRPC_FINISHED:
*tag = ev->tag;
return_type = RPC_END;
break;
case GRPC_FINISH_ACCEPTED:
*tag = ev->tag;
return_type = HALFCLOSE_OK;
break;
default:
// We do not handle client side messages now
gpr_log(GPR_ERROR, "client-side messages aren't supported yet");
abort();
} }
grpc_event_finish(ev); auto cq_tag = static_cast<CompletionQueueTag *>(ev->tag);
return return_type; cq_tag->FinalizeResult();
*tag = cq_tag->user_tag_;
*ok = ev->data.op_complete == GRPC_OP_OK;
return true;
} }
} // namespace grpc } // namespace grpc

@ -53,7 +53,6 @@ class ServerRpcHandler {
void StartRpc(); void StartRpc();
private: private:
CompletionQueue::CompletionType WaitForNextEvent();
void FinishRpc(const Status &status); void FinishRpc(const Status &status);
std::unique_ptr<AsyncServerContext> async_server_context_; std::unique_ptr<AsyncServerContext> async_server_context_;

Loading…
Cancel
Save