Merge pull request #1969 from ctiller/one-shouldnt-depend-on-protobufs

Remove Protobuf dependency for C++
pull/2184/head
Yang Gao 10 years ago
commit 253aaa6b10
  1. 4
      BUILD
  2. 4
      Makefile
  3. 2
      build.json
  4. 56
      include/grpc++/async_unary_call.h
  5. 23
      include/grpc++/byte_buffer.h
  6. 10
      include/grpc++/client_context.h
  7. 30
      include/grpc++/completion_queue.h
  8. 43
      include/grpc++/config.h
  9. 60
      include/grpc++/config_protobuf.h
  10. 427
      include/grpc++/impl/call.h
  11. 26
      include/grpc++/impl/client_unary_call.h
  12. 33
      include/grpc++/impl/proto_utils.h
  13. 111
      include/grpc++/impl/rpc_service_method.h
  14. 68
      include/grpc++/impl/serialization_traits.h
  15. 41
      include/grpc++/impl/service_type.h
  16. 126
      include/grpc++/server.h
  17. 16
      include/grpc++/server_context.h
  18. 466
      include/grpc++/stream.h
  19. 26
      include/grpc/support/port_platform.h
  20. 7
      src/compiler/config.h
  21. 194
      src/compiler/cpp_generator.cc
  22. 11
      src/cpp/client/channel.cc
  23. 10
      src/cpp/client/channel.h
  24. 307
      src/cpp/common/call.cc
  25. 26
      src/cpp/proto/proto_utils.cc
  26. 240
      src/cpp/server/server.cc
  27. 22
      src/cpp/server/server_context.cc
  28. 2
      test/cpp/end2end/generic_end2end_test.cc
  29. 2
      tools/doxygen/Doxyfile.c++
  30. 2
      tools/doxygen/Doxyfile.c++.internal
  31. 3
      vsprojects/grpc++/grpc++.vcxproj
  32. 6
      vsprojects/grpc++/grpc++.vcxproj.filters
  33. 3
      vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
  34. 6
      vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters

@ -584,7 +584,6 @@ cc_library(
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
@ -625,6 +624,7 @@ cc_library(
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
"include/grpc++/impl/serialization_traits.h",
"include/grpc++/impl/service_type.h",
"include/grpc++/impl/sync.h",
"include/grpc++/impl/sync_cxx11.h",
@ -664,7 +664,6 @@ cc_library(
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
@ -705,6 +704,7 @@ cc_library(
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
"include/grpc++/impl/serialization_traits.h",
"include/grpc++/impl/service_type.h",
"include/grpc++/impl/sync.h",
"include/grpc++/impl/sync_cxx11.h",

@ -3402,7 +3402,6 @@ LIBGRPC++_SRC = \
src/cpp/client/channel.cc \
src/cpp/client/channel_arguments.cc \
src/cpp/client/client_context.cc \
src/cpp/client/client_unary_call.cc \
src/cpp/client/create_channel.cc \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \
@ -3443,6 +3442,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/internal_stub.h \
include/grpc++/impl/rpc_method.h \
include/grpc++/impl/rpc_service_method.h \
include/grpc++/impl/serialization_traits.h \
include/grpc++/impl/service_type.h \
include/grpc++/impl/sync.h \
include/grpc++/impl/sync_cxx11.h \
@ -3690,7 +3690,6 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/channel.cc \
src/cpp/client/channel_arguments.cc \
src/cpp/client/client_context.cc \
src/cpp/client/client_unary_call.cc \
src/cpp/client/create_channel.cc \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \
@ -3731,6 +3730,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/internal_stub.h \
include/grpc++/impl/rpc_method.h \
include/grpc++/impl/rpc_service_method.h \
include/grpc++/impl/serialization_traits.h \
include/grpc++/impl/service_type.h \
include/grpc++/impl/sync.h \
include/grpc++/impl/sync_cxx11.h \

@ -45,6 +45,7 @@
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
"include/grpc++/impl/serialization_traits.h",
"include/grpc++/impl/service_type.h",
"include/grpc++/impl/sync.h",
"include/grpc++/impl/sync_cxx11.h",
@ -72,7 +73,6 @@
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",

@ -51,47 +51,50 @@ class ClientAsyncResponseReaderInterface {
virtual ~ClientAsyncResponseReaderInterface() {}
virtual void ReadInitialMetadata(void* tag) = 0;
virtual void Finish(R* msg, Status* status, void* tag) = 0;
};
template <class R>
class ClientAsyncResponseReader GRPC_FINAL
: public ClientAsyncResponseReaderInterface<R> {
public:
template <class W>
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const grpc::protobuf::Message& request)
const W& request)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose();
init_buf_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(init_buf_.SendMessage(request).ok());
init_buf_.ClientSendClose();
call_.PerformOps(&init_buf_);
}
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
meta_buf_.set_output_tag(tag);
meta_buf_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
finish_buf_.Reset(tag);
finish_buf_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
finish_buf_.RecvInitialMetadata(context_);
}
finish_buf_.AddRecvMessage(msg);
finish_buf_.AddClientRecvStatus(context_, status);
finish_buf_.RecvMessage(msg);
finish_buf_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
private:
ClientContext* context_;
Call call_;
SneakyCallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose> init_buf_;
CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
CallOpClientRecvStatus> finish_buf_;
};
template <class W>
@ -104,34 +107,36 @@ class ServerAsyncResponseWriter GRPC_FINAL
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
meta_buf_.set_output_tag(tag);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.Reset(tag);
finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.ok()) {
finish_buf_.AddSendMessage(msg);
finish_buf_.ServerSendStatus(
ctx_->trailing_metadata_, finish_buf_.SendMessage(msg));
} else {
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok());
finish_buf_.Reset(tag);
finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -140,8 +145,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> meta_buf_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus> finish_buf_;
};
} // namespace grpc

@ -39,6 +39,8 @@
#include <grpc/support/log.h>
#include <grpc++/config.h>
#include <grpc++/slice.h>
#include <grpc++/status.h>
#include <grpc++/impl/serialization_traits.h>
#include <vector>
@ -62,7 +64,10 @@ class ByteBuffer GRPC_FINAL {
size_t Length() const;
private:
friend class CallOpBuffer;
friend class SerializationTraits<ByteBuffer, void>;
ByteBuffer(const ByteBuffer&);
ByteBuffer& operator=(const ByteBuffer&);
// takes ownership
void set_buffer(grpc_byte_buffer* buf) {
@ -78,6 +83,22 @@ class ByteBuffer GRPC_FINAL {
grpc_byte_buffer* buffer_;
};
template <>
class SerializationTraits<ByteBuffer, void> {
public:
static Status Deserialize(grpc_byte_buffer* byte_buffer, ByteBuffer* dest,
int max_message_size) {
dest->set_buffer(byte_buffer);
return Status::OK;
}
static Status Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer,
bool* own_buffer) {
*buffer = source.buffer();
*own_buffer = false;
return Status::OK;
}
};
} // namespace grpc
#endif // GRPCXX_BYTE_BUFFER_H

@ -49,7 +49,6 @@ struct grpc_completion_queue;
namespace grpc {
class CallOpBuffer;
class ChannelInterface;
class CompletionQueue;
class Credentials;
@ -115,7 +114,8 @@ class ClientContext {
ClientContext(const ClientContext&);
ClientContext& operator=(const ClientContext&);
friend class CallOpBuffer;
friend class CallOpClientRecvStatus;
friend class CallOpRecvInitialMetadata;
friend class Channel;
template <class R>
friend class ::grpc::ClientReader;
@ -131,6 +131,12 @@ class ClientContext {
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel,
const RpcMethod& method,
ClientContext* context,
const InputMessage& request,
OutputMessage* result);
grpc_call* call() { return call_; }
void set_call(grpc_call* call,

@ -35,7 +35,6 @@
#define GRPCXX_COMPLETION_QUEUE_H
#include <grpc/support/time.h>
#include <grpc++/impl/client_unary_call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/time.h>
@ -55,11 +54,23 @@ template <class W>
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class ChannelInterface;
class ClientContext;
class CompletionQueue;
class RpcMethod;
class Server;
class ServerBuilder;
class ServerContext;
class Status;
class CompletionQueueTag {
public:
@ -84,7 +95,7 @@ class CompletionQueue : public GrpcLibrary {
// Nonblocking (until deadline) read from queue.
// Cannot rely on result of tag or ok if return is TIMEOUT
template<typename T>
template <typename T>
NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
TimePoint<T> deadline_tp(deadline);
return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
@ -118,13 +129,22 @@ class CompletionQueue : public GrpcLibrary {
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
template <class ServiceType, class RequestType, class ResponseType>
friend class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel,
const RpcMethod& method,
ClientContext* context,
const grpc::protobuf::Message& request,
grpc::protobuf::Message* result);
const InputMessage& request,
OutputMessage* result);
NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);

@ -77,31 +77,6 @@
#define GRPC_OVERRIDE override
#endif
#ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
#endif
#ifndef GRPC_CUSTOM_MESSAGE
#include <google/protobuf/message.h>
#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
#endif
#ifndef GRPC_CUSTOM_STRING
#include <string>
#define GRPC_CUSTOM_STRING std::string
#endif
#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
::google::protobuf::io::ZeroCopyOutputStream
#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
::google::protobuf::io::ZeroCopyInputStream
#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream
#endif
#ifdef GRPC_CXX0X_NO_NULLPTR
#include <memory>
const class {
@ -125,23 +100,15 @@ const class {
} nullptr = {};
#endif
#ifndef GRPC_CUSTOM_STRING
#include <string>
#define GRPC_CUSTOM_STRING std::string
#endif
namespace grpc {
typedef GRPC_CUSTOM_STRING string;
namespace protobuf {
typedef GRPC_CUSTOM_MESSAGE Message;
typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
namespace io {
typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
} // namespace io
} // namespace protobuf
} // namespace grpc
#endif // GRPCXX_CONFIG_H

@ -31,34 +31,42 @@
*
*/
#include <grpc++/impl/client_unary_call.h>
#include <grpc++/impl/call.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
#ifndef GRPCXX_CONFIG_PROTOBUF_H
#define GRPCXX_CONFIG_PROTOBUF_H
#ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
#endif
#ifndef GRPC_CUSTOM_MESSAGE
#include <google/protobuf/message.h>
#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
#endif
#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
::google::protobuf::io::ZeroCopyOutputStream
#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
::google::protobuf::io::ZeroCopyInputStream
#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream
#endif
namespace grpc {
namespace protobuf {
// Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context,
const grpc::protobuf::Message& request,
grpc::protobuf::Message* result) {
CompletionQueue cq;
Call call(channel->CreateCall(method, context, &cq));
CallOpBuffer buf;
Status status;
buf.AddSendInitialMetadata(context);
buf.AddSendMessage(request);
buf.AddRecvInitialMetadata(context);
buf.AddRecvMessage(result);
buf.AddClientSendClose();
buf.AddClientRecvStatus(context, &status);
call.PerformOps(&buf);
GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.ok());
return status;
}
typedef GRPC_CUSTOM_MESSAGE Message;
typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
namespace io {
typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
} // namespace io
} // namespace protobuf
} // namespace grpc
#endif // GRPCXX_CONFIG_PROTOBUF_H

@ -34,14 +34,18 @@
#ifndef GRPCXX_IMPL_CALL_H
#define GRPCXX_IMPL_CALL_H
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/status.h>
#include <grpc++/impl/serialization_traits.h>
#include <memory>
#include <map>
#include <string.h>
struct grpc_call;
struct grpc_op;
@ -50,84 +54,383 @@ namespace grpc {
class ByteBuffer;
class Call;
class CallOpBuffer : public CompletionQueueTag {
void FillMetadataMap(grpc_metadata_array* arr,
std::multimap<grpc::string, grpc::string>* metadata);
grpc_metadata* FillMetadataArray(
const std::multimap<grpc::string, grpc::string>& metadata);
/// Default argument for CallOpSet. I is unused by the class, but can be
/// used for generating multiple names for the same thing.
template <int I>
class CallNoOp {
protected:
void AddOp(grpc_op* ops, size_t* nops) {}
void FinishOp(bool* status, int max_message_size) {}
};
class CallOpSendInitialMetadata {
public:
CallOpBuffer();
~CallOpBuffer();
void Reset(void* next_return_tag);
// Does not take ownership.
void AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string>* metadata);
void AddSendInitialMetadata(ClientContext* ctx);
void AddRecvInitialMetadata(ClientContext* ctx);
void AddSendMessage(const grpc::protobuf::Message& message);
void AddSendMessage(const ByteBuffer& message);
void AddRecvMessage(grpc::protobuf::Message* message);
void AddRecvMessage(ByteBuffer* message);
void AddClientSendClose();
void AddClientRecvStatus(ClientContext* ctx, Status* status);
void AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata,
const Status& status);
void AddServerRecvClose(bool* cancelled);
// INTERNAL API:
// Convert to an array of grpc_op elements
void FillOps(grpc_op* ops, size_t* nops);
// Called by completion queue just prior to returning from Next() or Pluck()
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
CallOpSendInitialMetadata() : send_(false) {}
void set_max_message_size(int max_message_size) {
max_message_size_ = max_message_size;
void SendInitialMetadata(
const std::multimap<grpc::string, grpc::string>& metadata) {
send_ = true;
initial_metadata_count_ = metadata.size();
initial_metadata_ = FillMetadataArray(metadata);
}
bool got_message;
protected:
void AddOp(grpc_op* ops, size_t* nops) {
if (!send_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->flags = 0;
op->data.send_initial_metadata.count = initial_metadata_count_;
op->data.send_initial_metadata.metadata = initial_metadata_;
}
void FinishOp(bool* status, int max_message_size) {
if (!send_) return;
gpr_free(initial_metadata_);
send_ = false;
}
private:
void* return_tag_;
// Send initial metadata
bool send_initial_metadata_;
bool send_;
size_t initial_metadata_count_;
grpc_metadata* initial_metadata_;
// Recv initial metadta
std::multimap<grpc::string, grpc::string>* recv_initial_metadata_;
grpc_metadata_array recv_initial_metadata_arr_;
// Send message
const grpc::protobuf::Message* send_message_;
const ByteBuffer* send_message_buffer_;
};
class CallOpSendMessage {
public:
CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {}
template <class M>
Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
protected:
void AddOp(grpc_op* ops, size_t* nops) {
if (send_buf_ == nullptr) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_MESSAGE;
op->flags = 0;
op->data.send_message = send_buf_;
}
void FinishOp(bool* status, int max_message_size) {
if (own_buf_) grpc_byte_buffer_destroy(send_buf_);
send_buf_ = nullptr;
}
private:
grpc_byte_buffer* send_buf_;
// Recv message
grpc::protobuf::Message* recv_message_;
ByteBuffer* recv_message_buffer_;
bool own_buf_;
};
template <class M>
Status CallOpSendMessage::SendMessage(const M& message) {
return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
}
template <class R>
class CallOpRecvMessage {
public:
CallOpRecvMessage() : got_message(false), message_(nullptr) {}
void RecvMessage(R* message) { message_ = message; }
bool got_message;
protected:
void AddOp(grpc_op* ops, size_t* nops) {
if (message_ == nullptr) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_MESSAGE;
op->flags = 0;
op->data.recv_message = &recv_buf_;
}
void FinishOp(bool* status, int max_message_size) {
if (message_ == nullptr) return;
if (recv_buf_) {
if (*status) {
got_message = true;
*status = SerializationTraits<R>::Deserialize(recv_buf_, message_,
max_message_size)
.ok();
} else {
got_message = false;
grpc_byte_buffer_destroy(recv_buf_);
}
} else {
got_message = false;
*status = false;
}
message_ = nullptr;
}
private:
R* message_;
grpc_byte_buffer* recv_buf_;
int max_message_size_;
// Client send close
bool client_send_close_;
// Client recv status
};
class CallOpGenericRecvMessage {
public:
CallOpGenericRecvMessage() : got_message(false) {}
template <class R>
void RecvMessage(R* message) {
deserialize_ = [message](grpc_byte_buffer* buf,
int max_message_size) -> Status {
return SerializationTraits<R>::Deserialize(buf, message,
max_message_size);
};
}
bool got_message;
protected:
void AddOp(grpc_op* ops, size_t* nops) {
if (!deserialize_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_MESSAGE;
op->flags = 0;
op->data.recv_message = &recv_buf_;
}
void FinishOp(bool* status, int max_message_size) {
if (!deserialize_) return;
if (recv_buf_) {
if (*status) {
got_message = true;
*status = deserialize_(recv_buf_, max_message_size).ok();
} else {
got_message = false;
grpc_byte_buffer_destroy(recv_buf_);
}
} else {
got_message = false;
*status = false;
}
deserialize_ = DeserializeFunc();
}
private:
typedef std::function<Status(grpc_byte_buffer*, int)> DeserializeFunc;
DeserializeFunc deserialize_;
grpc_byte_buffer* recv_buf_;
};
class CallOpClientSendClose {
public:
CallOpClientSendClose() : send_(false) {}
void ClientSendClose() { send_ = true; }
protected:
void AddOp(grpc_op* ops, size_t* nops) {
if (!send_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
}
void FinishOp(bool* status, int max_message_size) { send_ = false; }
private:
bool send_;
};
class CallOpServerSendStatus {
public:
CallOpServerSendStatus() : send_status_available_(false) {}
void ServerSendStatus(
const std::multimap<grpc::string, grpc::string>& trailing_metadata,
const Status& status) {
trailing_metadata_count_ = trailing_metadata.size();
trailing_metadata_ = FillMetadataArray(trailing_metadata);
send_status_available_ = true;
send_status_code_ = static_cast<grpc_status_code>(status.error_code());
send_status_details_ = status.error_message();
}
protected:
void AddOp(grpc_op* ops, size_t* nops) {
if (!send_status_available_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count =
trailing_metadata_count_;
op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
op->data.send_status_from_server.status = send_status_code_;
op->data.send_status_from_server.status_details =
send_status_details_.empty() ? nullptr : send_status_details_.c_str();
op->flags = 0;
}
void FinishOp(bool* status, int max_message_size) {
if (!send_status_available_) return;
gpr_free(trailing_metadata_);
send_status_available_ = false;
}
private:
bool send_status_available_;
grpc_status_code send_status_code_;
grpc::string send_status_details_;
size_t trailing_metadata_count_;
grpc_metadata* trailing_metadata_;
};
class CallOpRecvInitialMetadata {
public:
CallOpRecvInitialMetadata() : recv_initial_metadata_(nullptr) {}
void RecvInitialMetadata(ClientContext* context) {
context->initial_metadata_received_ = true;
recv_initial_metadata_ = &context->recv_initial_metadata_;
}
protected:
void AddOp(grpc_op* ops, size_t* nops) {
if (!recv_initial_metadata_) return;
memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &recv_initial_metadata_arr_;
op->flags = 0;
}
void FinishOp(bool* status, int max_message_size) {
if (recv_initial_metadata_ == nullptr) return;
FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
recv_initial_metadata_ = nullptr;
}
private:
std::multimap<grpc::string, grpc::string>* recv_initial_metadata_;
grpc_metadata_array recv_initial_metadata_arr_;
};
class CallOpClientRecvStatus {
public:
CallOpClientRecvStatus() : recv_status_(nullptr) {}
void ClientRecvStatus(ClientContext* context, Status* status) {
recv_trailing_metadata_ = &context->trailing_metadata_;
recv_status_ = status;
}
protected:
void AddOp(grpc_op* ops, size_t* nops) {
if (recv_status_ == nullptr) return;
memset(&recv_trailing_metadata_arr_, 0,
sizeof(recv_trailing_metadata_arr_));
status_details_ = nullptr;
status_details_capacity_ = 0;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata =
&recv_trailing_metadata_arr_;
op->data.recv_status_on_client.status = &status_code_;
op->data.recv_status_on_client.status_details = &status_details_;
op->data.recv_status_on_client.status_details_capacity =
&status_details_capacity_;
op->flags = 0;
}
void FinishOp(bool* status, int max_message_size) {
if (recv_status_ == nullptr) return;
FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
*recv_status_ = Status(
static_cast<StatusCode>(status_code_),
status_details_ ? grpc::string(status_details_) : grpc::string());
gpr_free(status_details_);
recv_status_ = nullptr;
}
private:
std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_;
Status* recv_status_;
grpc_metadata_array recv_trailing_metadata_arr_;
grpc_status_code status_code_;
char* status_details_;
size_t status_details_capacity_;
// Server send status
bool send_status_available_;
grpc_status_code send_status_code_;
grpc::string send_status_details_;
size_t trailing_metadata_count_;
grpc_metadata* trailing_metadata_;
int cancelled_buf_;
bool* recv_closed_;
};
// SneakyCallOpBuffer does not post completions to the completion queue
class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
/// An abstract collection of call ops, used to generate the
/// grpc_call_op structure to pass down to the lower layers,
/// and as it is-a CompletionQueueTag, also massages the final
/// completion into the correct form for consumption in the C++
/// API.
class CallOpSetInterface : public CompletionQueueTag {
public:
CallOpSetInterface() : max_message_size_(0) {}
/// Fills in grpc_op, starting from ops[*nops] and moving
/// upwards.
virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
void set_max_message_size(int max_message_size) {
max_message_size_ = max_message_size;
}
protected:
int max_message_size_;
};
/// Primary implementaiton of CallOpSetInterface.
/// Since we cannot use variadic templates, we declare slots up to
/// the maximum count of ops we'll need in a set. We leverage the
/// empty base class optimization to slim this class (especially
/// when there are many unused slots used). To avoid duplicate base classes,
/// the template parmeter for CallNoOp is varied by argument position.
template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
class CallOpSet : public CallOpSetInterface,
public Op1,
public Op2,
public Op3,
public Op4,
public Op5,
public Op6 {
public:
CallOpSet() : return_tag_(this) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE {
this->Op1::AddOp(ops, nops);
this->Op2::AddOp(ops, nops);
this->Op3::AddOp(ops, nops);
this->Op4::AddOp(ops, nops);
this->Op5::AddOp(ops, nops);
this->Op6::AddOp(ops, nops);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
this->Op1::FinishOp(status, max_message_size_);
this->Op2::FinishOp(status, max_message_size_);
this->Op3::FinishOp(status, max_message_size_);
this->Op4::FinishOp(status, max_message_size_);
this->Op5::FinishOp(status, max_message_size_);
this->Op6::FinishOp(status, max_message_size_);
*tag = return_tag_;
return true;
}
void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
private:
void* return_tag_;
};
/// A CallOpSet that does not post completions to the completion queue.
///
/// Allows hiding some completions that the C core must generate from
/// C++ users.
template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
class SneakyCallOpSet GRPC_FINAL
: public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
public:
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
return CallOpBuffer::FinalizeResult(tag, status) && false;
typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base;
return Base::FinalizeResult(tag, status) && false;
}
};
@ -135,7 +438,7 @@ class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
class CallHook {
public:
virtual ~CallHook() {}
virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0;
virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
};
// Straightforward wrapping of the C call object
@ -146,7 +449,7 @@ class Call GRPC_FINAL {
Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq,
int max_message_size);
void PerformOps(CallOpBuffer* buffer);
void PerformOps(CallOpSetInterface* ops);
grpc_call* call() { return call_; }
CompletionQueue* cq() { return cq_; }

@ -37,6 +37,8 @@
#include <grpc++/config.h>
#include <grpc++/status.h>
#include <grpc++/impl/call.h>
namespace grpc {
class ChannelInterface;
@ -45,10 +47,28 @@ class CompletionQueue;
class RpcMethod;
// Wrapper that performs a blocking unary call
template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context,
const grpc::protobuf::Message& request,
grpc::protobuf::Message* result);
ClientContext* context, const InputMessage& request,
OutputMessage* result) {
CompletionQueue cq;
Call call(channel->CreateCall(method, context, &cq));
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
CallOpClientSendClose, CallOpClientRecvStatus> ops;
Status status = ops.SendMessage(request);
if (!status.ok()) {
return status;
}
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.RecvInitialMetadata(context);
ops.RecvMessage(result);
ops.ClientSendClose();
ops.ClientRecvStatus(context, &status);
call.PerformOps(&ops);
GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok());
return status;
}
} // namespace grpc

@ -34,21 +34,42 @@
#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
#include <grpc++/config.h>
#include <type_traits>
struct grpc_byte_buffer;
#include <grpc/grpc.h>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/config_protobuf.h>
#include <grpc++/status.h>
namespace grpc {
// Serialize the msg into a buffer created inside the function. The caller
// should destroy the returned buffer when done with it. If serialization fails,
// false is returned and buffer is left unchanged.
bool SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer);
Status SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer);
// The caller keeps ownership of buffer and msg.
bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size);
Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size);
template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of<
grpc::protobuf::Message, T>::value>::type> {
public:
static Status Serialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer, bool* own_buffer) {
*own_buffer = true;
return SerializeProto(msg, buffer);
}
static Status Deserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg,
int max_message_size) {
auto status = DeserializeProto(buffer, msg, max_message_size);
grpc_byte_buffer_destroy(buffer);
return status;
}
};
} // namespace grpc

@ -55,16 +55,19 @@ class MethodHandler {
public:
virtual ~MethodHandler() {}
struct HandlerParameter {
HandlerParameter(Call* c, ServerContext* context,
const grpc::protobuf::Message* req,
grpc::protobuf::Message* resp)
: call(c), server_context(context), request(req), response(resp) {}
HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req,
int max_size)
: call(c),
server_context(context),
request(req),
max_message_size(max_size) {}
Call* call;
ServerContext* server_context;
const grpc::protobuf::Message* request;
grpc::protobuf::Message* response;
// Handler required to grpc_byte_buffer_destroy this
grpc_byte_buffer* request;
int max_message_size;
};
virtual Status RunHandler(const HandlerParameter& param) = 0;
virtual void RunHandler(const HandlerParameter& param) = 0;
};
// A wrapper class of an application provided rpc method handler.
@ -77,11 +80,25 @@ class RpcMethodHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
// Invoke application function, cast proto messages to their actual types.
return func_(service_, param.server_context,
dynamic_cast<const RequestType*>(param.request),
dynamic_cast<ResponseType*>(param.response));
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
RequestType req;
Status status = SerializationTraits<RequestType>::Deserialize(
param.request, &req, param.max_message_size);
ResponseType rsp;
if (status.ok()) {
status = func_(service_, param.server_context, &req, &rsp);
}
GPR_ASSERT(!param.server_context->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus> ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (status.ok()) {
status = ops.SendMessage(rsp);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
@ -102,10 +119,21 @@ class ClientStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReader<RequestType> reader(param.call, param.server_context);
return func_(service_, param.server_context, &reader,
dynamic_cast<ResponseType*>(param.response));
ResponseType rsp;
Status status = func_(service_, param.server_context, &reader, &rsp);
GPR_ASSERT(!param.server_context->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus> ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (status.ok()) {
status = ops.SendMessage(rsp);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
@ -124,10 +152,23 @@ class ServerStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerWriter<ResponseType> writer(param.call, param.server_context);
return func_(service_, param.server_context,
dynamic_cast<const RequestType*>(param.request), &writer);
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
RequestType req;
Status status = SerializationTraits<RequestType>::Deserialize(
param.request, &req, param.max_message_size);
if (status.ok()) {
ServerWriter<ResponseType> writer(param.call, param.server_context);
status = func_(service_, param.server_context, &req, &writer);
}
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
@ -147,10 +188,18 @@ class BidiStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReaderWriter<ResponseType, RequestType> stream(param.call,
param.server_context);
return func_(service_, param.server_context, &stream);
Status status = func_(service_, param.server_context, &stream);
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
@ -162,29 +211,15 @@ class BidiStreamingHandler : public MethodHandler {
// Server side rpc method class
class RpcServiceMethod : public RpcMethod {
public:
// Takes ownership of the handler and two prototype objects.
// Takes ownership of the handler
RpcServiceMethod(const char* name, RpcMethod::RpcType type,
MethodHandler* handler,
grpc::protobuf::Message* request_prototype,
grpc::protobuf::Message* response_prototype)
: RpcMethod(name, type, nullptr),
handler_(handler),
request_prototype_(request_prototype),
response_prototype_(response_prototype) {}
MethodHandler* handler)
: RpcMethod(name, type, nullptr), handler_(handler) {}
MethodHandler* handler() { return handler_.get(); }
grpc::protobuf::Message* AllocateRequestProto() {
return request_prototype_->New();
}
grpc::protobuf::Message* AllocateResponseProto() {
return response_prototype_->New();
}
private:
std::unique_ptr<MethodHandler> handler_;
std::unique_ptr<grpc::protobuf::Message> request_prototype_;
std::unique_ptr<grpc::protobuf::Message> response_prototype_;
};
// This class contains all the method information for an rpc service. It is

@ -0,0 +1,68 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_IMPL_SERIALIZATION_TRAITS_H
#define GRPCXX_IMPL_SERIALIZATION_TRAITS_H
namespace grpc {
/// Defines how to serialize and deserialize some type.
///
/// Used for hooking different message serialization API's into GRPC.
/// Each SerializationTraits implementation must provide the following
/// functions:
/// static Status Serialize(const Message& msg,
/// grpc_byte_buffer** buffer,
// bool* own_buffer);
/// static Status Deserialize(grpc_byte_buffer* buffer,
/// Message* msg,
/// int max_message_size);
///
/// Serialize is required to convert message to a grpc_byte_buffer, and
/// to store a pointer to that byte buffer at *buffer. *own_buffer should
/// be set to true if the caller owns said byte buffer, or false if
/// ownership is retained elsewhere.
///
/// Deserialize is required to convert buffer into the message stored at
/// msg. max_message_size is passed in as a bound on the maximum number of
/// message bytes Deserialize should accept.
///
/// Both functions return a Status, allowing them to explain what went
/// wrong if required.
template <class Message,
class UnusedButHereForPartialTemplateSpecialization = void>
class SerializationTraits;
} // namespace grpc
#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H

@ -35,6 +35,8 @@
#define GRPCXX_IMPL_SERVICE_TYPE_H
#include <grpc++/config.h>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/server.h>
#include <grpc++/status.h>
namespace grpc {
@ -65,20 +67,8 @@ class ServerAsyncStreamingInterface {
class AsynchronousService {
public:
// this is Server, but in disguise to avoid a link dependency
class DispatchImpl {
public:
virtual void RequestAsyncCall(void* registered_method,
ServerContext* context,
::grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) = 0;
};
AsynchronousService(const char** method_names, size_t method_count)
: dispatch_impl_(nullptr),
: server_(nullptr),
method_names_(method_names),
method_count_(method_count),
request_args_(nullptr) {}
@ -86,42 +76,43 @@ class AsynchronousService {
~AsynchronousService() { delete[] request_args_; }
protected:
void RequestAsyncUnary(int index, ServerContext* context,
grpc::protobuf::Message* request,
template <class Message>
void RequestAsyncUnary(int index, ServerContext* context, Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
stream, call_cq, notification_cq, tag);
server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
notification_cq, tag, request);
}
void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
stream, call_cq, notification_cq, tag);
server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
notification_cq, tag);
}
template <class Message>
void RequestServerStreaming(int index, ServerContext* context,
grpc::protobuf::Message* request,
Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
stream, call_cq, notification_cq, tag);
server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
notification_cq, tag, request);
}
void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
stream, call_cq, notification_cq, tag);
server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
notification_cq, tag);
}
private:
friend class Server;
DispatchImpl* dispatch_impl_;
Server* server_;
const char** const method_names_;
size_t method_count_;
void** request_args_;

@ -41,25 +41,24 @@
#include <grpc++/config.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/impl/sync.h>
#include <grpc++/status.h>
struct grpc_server;
namespace grpc {
class AsynchronousService;
class GenericServerContext;
class AsyncGenericService;
class RpcService;
class RpcServiceMethod;
class ServerAsyncStreamingInterface;
class ServerCredentials;
class ThreadPoolInterface;
// Currently it only supports handling rpcs in a single thread.
class Server GRPC_FINAL : public GrpcLibrary,
private CallHook,
private AsynchronousService::DispatchImpl {
class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
public:
~Server();
@ -73,6 +72,7 @@ class Server GRPC_FINAL : public GrpcLibrary,
private:
friend class AsyncGenericService;
friend class AsynchronousService;
friend class ServerBuilder;
class SyncRequest;
@ -96,21 +96,123 @@ class Server GRPC_FINAL : public GrpcLibrary,
void RunRpc();
void ScheduleCallback();
void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
class BaseAsyncRequest : public CompletionQueueTag {
public:
BaseAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag);
virtual ~BaseAsyncRequest();
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
protected:
Server* const server_;
ServerContext* const context_;
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
void* const tag_;
grpc_call* call_;
grpc_metadata_array initial_metadata_array_;
};
class RegisteredAsyncRequest : public BaseAsyncRequest {
public:
RegisteredAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag);
// uses BaseAsyncRequest::FinalizeResult
protected:
void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq);
};
class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
public:
NoPayloadAsyncRequest(void* registered_method, Server* server,
ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
IssueRequest(registered_method, nullptr, notification_cq);
}
// uses RegisteredAsyncRequest::FinalizeResult
};
template <class Message>
class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
public:
PayloadAsyncRequest(void* registered_method, Server* server,
ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* request)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag),
request_(request) {
IssueRequest(registered_method, &payload_, notification_cq);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
bool serialization_status =
*status && payload_ &&
SerializationTraits<Message>::Deserialize(payload_, request_,
server_->max_message_size_)
.ok();
bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
*status = serialization_status && *status;
return ret;
}
private:
grpc_byte_buffer* payload_;
Message* const request_;
};
class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest {
public:
GenericAsyncRequest(Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag);
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
private:
grpc_call_details call_details_;
};
template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
call_cq, notification_cq, tag, message);
}
// DispatchImpl
void RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) GRPC_OVERRIDE;
ServerCompletionQueue* notification_cq, void* tag) {
new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
notification_cq, tag);
}
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* cq,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag);
void* tag) {
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag);
}
const int max_message_size_;
@ -133,8 +235,6 @@ class Server GRPC_FINAL : public GrpcLibrary,
ThreadPoolInterface* thread_pool_;
// Whether the thread pool is created and owned by the server.
bool thread_pool_owned_;
private:
Server() : max_message_size_(-1), server_(NULL) { abort(); }
};
} // namespace grpc

@ -60,6 +60,14 @@ template <class W>
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class Call;
class CallOpBuffer;
@ -105,6 +113,14 @@ class ServerContext {
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
template <class ServiceType, class RequestType, class ResponseType>
friend class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler;
// Prevent copying.
ServerContext(const ServerContext&);

@ -93,15 +93,18 @@ template <class R>
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
template <class W>
ClientReader(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const grpc::protobuf::Message& request)
ClientContext* context, const W& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpBuffer buf;
buf.AddSendInitialMetadata(&context->send_initial_metadata_);
buf.AddSendMessage(request);
buf.AddClientSendClose();
call_.PerformOps(&buf);
cq_.Pluck(&buf);
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(ops.SendMessage(request).ok());
ops.ClientSendClose();
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
@ -111,28 +114,28 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
buf.AddRecvInitialMetadata(context_);
call_.PerformOps(&buf);
cq_.Pluck(&buf); // status ignored
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
buf.AddRecvInitialMetadata(context_);
ops.RecvInitialMetadata(context_);
}
buf.AddRecvMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf) && buf.got_message;
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
Status Finish() GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
@ -150,48 +153,49 @@ class ClientWriterInterface : public ClientStreamingInterface,
};
template <class W>
class ClientWriter GRPC_FINAL : public ClientWriterInterface<W> {
class ClientWriter : public ClientWriterInterface<W> {
public:
// Blocking create a stream.
template <class R>
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, grpc::protobuf::Message* response)
: context_(context),
response_(response),
call_(channel->CreateCall(method, context, &cq_)) {
CallOpBuffer buf;
buf.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&buf);
cq_.Pluck(&buf);
ClientContext* context, R* response)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddSendMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg).ok()) {
return false;
}
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddClientSendClose();
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
CallOpBuffer buf;
Status status;
buf.AddRecvMessage(response_);
buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
finish_ops_.ClientRecvStatus(context_, &status);
call_.PerformOps(&finish_ops_);
GPR_ASSERT(cq_.Pluck(&finish_ops_));
return status;
}
private:
ClientContext* context_;
grpc::protobuf::Message* const response_;
CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
@ -213,10 +217,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpBuffer buf;
buf.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&buf);
cq_.Pluck(&buf);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
@ -226,42 +230,42 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
buf.AddRecvInitialMetadata(context_);
call_.PerformOps(&buf);
cq_.Pluck(&buf); // status ignored
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
buf.AddRecvInitialMetadata(context_);
ops.RecvInitialMetadata(context_);
}
buf.AddRecvMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf) && buf.got_message;
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddSendMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg).ok()) return false;
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddClientSendClose();
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
Status Finish() GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
@ -279,18 +283,18 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpBuffer buf;
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&buf);
call_->cq()->Pluck(&buf);
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf) && buf.got_message;
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
@ -306,22 +310,24 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpBuffer buf;
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&buf);
call_->cq()->Pluck(&buf);
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
buf.AddSendMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
@ -339,29 +345,31 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpBuffer buf;
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&buf);
call_->cq()->Pluck(&buf);
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf) && buf.got_message;
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
buf.AddSendMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
@ -400,57 +408,59 @@ class AsyncWriterInterface {
template <class R>
class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
public AsyncReaderInterface<R> {
};
public AsyncReaderInterface<R> {};
template <class R>
class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
public:
// Create a stream and write the first request out.
template <class W>
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const grpc::protobuf::Message& request, void* tag)
const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose();
call_.PerformOps(&init_buf_);
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(init_ops_.SendMessage(request).ok());
init_ops_.ClientSendClose();
call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_buf_.Reset(tag);
read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
read_buf_.AddRecvInitialMetadata(context_);
read_ops_.RecvInitialMetadata(context_);
}
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
finish_buf_.Reset(tag);
finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
finish_ops_.RecvInitialMetadata(context_);
}
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
Call call_;
CallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
template <class W>
@ -463,56 +473,57 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
template <class W>
class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
public:
template <class R>
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
grpc::protobuf::Message* response, void* tag)
: context_(context),
response_(response),
call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&init_buf_);
R* response, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
finish_ops_.RecvMessage(response);
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_buf_.Reset(tag);
write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_);
write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
writes_done_buf_.Reset(tag);
writes_done_buf_.AddClientSendClose();
call_.PerformOps(&writes_done_buf_);
writes_done_ops_.set_output_tag(tag);
writes_done_ops_.ClientSendClose();
call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
finish_buf_.Reset(tag);
finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
finish_ops_.RecvInitialMetadata(context_);
}
finish_buf_.AddRecvMessage(response_);
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
grpc::protobuf::Message* const response_;
Call call_;
CallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer write_buf_;
CallOpBuffer writes_done_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpSendMessage> write_ops_;
CallOpSet<CallOpClientSendClose> writes_done_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
CallOpClientRecvStatus> finish_ops_;
};
// Client-side interface for bi-directional streaming.
@ -532,58 +543,59 @@ class ClientAsyncReaderWriter GRPC_FINAL
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&init_buf_);
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_buf_.Reset(tag);
read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
read_buf_.AddRecvInitialMetadata(context_);
read_ops_.RecvInitialMetadata(context_);
}
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_buf_.Reset(tag);
write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_);
write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
writes_done_buf_.Reset(tag);
writes_done_buf_.AddClientSendClose();
call_.PerformOps(&writes_done_buf_);
writes_done_ops_.set_output_tag(tag);
writes_done_ops_.ClientSendClose();
call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
finish_buf_.Reset(tag);
finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
finish_ops_.RecvInitialMetadata(context_);
}
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
Call call_;
CallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer write_buf_;
CallOpBuffer writes_done_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendMessage> write_ops_;
CallOpSet<CallOpClientSendClose> writes_done_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
template <class W, class R>
@ -596,41 +608,44 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
read_ops_.set_output_tag(tag);
read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.Reset(tag);
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.ok()) {
finish_buf_.AddSendMessage(msg);
finish_ops_.ServerSendStatus(
ctx_->trailing_metadata_,
finish_ops_.SendMessage(msg));
} else {
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
call_.PerformOps(&finish_ops_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok());
finish_buf_.Reset(tag);
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
private:
@ -638,9 +653,10 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus> finish_ops_;
};
template <class W>
@ -653,30 +669,31 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_buf_.Reset(tag);
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_);
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
finish_buf_.Reset(tag);
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
private:
@ -684,9 +701,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer write_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
// Server-side interface for bi-directional streaming.
@ -701,36 +718,37 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
read_ops_.set_output_tag(tag);
read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_buf_.Reset(tag);
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_);
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
finish_buf_.Reset(tag);
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
private:
@ -738,10 +756,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer write_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
} // namespace grpc

@ -223,7 +223,9 @@
#endif
/* Validate platform combinations */
#if defined(GPR_GCC_ATOMIC) + defined(GPR_GCC_SYNC) + defined(GPR_WIN32_ATOMIC) != 1
#if defined(GPR_GCC_ATOMIC) + defined(GPR_GCC_SYNC) + \
defined(GPR_WIN32_ATOMIC) != \
1
#error Must define exactly one of GPR_GCC_ATOMIC, GPR_GCC_SYNC, GPR_WIN32_ATOMIC
#endif
@ -231,7 +233,9 @@
#error Must define exactly one of GPR_ARCH_32, GPR_ARCH_64
#endif
#if defined(GPR_CPU_LINUX) + defined(GPR_CPU_POSIX) + defined(GPR_WIN32) + defined(GPR_CPU_IPHONE) + defined(GPR_CPU_CUSTOM) != 1
#if defined(GPR_CPU_LINUX) + defined(GPR_CPU_POSIX) + defined(GPR_WIN32) + \
defined(GPR_CPU_IPHONE) + defined(GPR_CPU_CUSTOM) != \
1
#error Must define exactly one of GPR_CPU_LINUX, GPR_CPU_POSIX, GPR_WIN32, GPR_CPU_IPHONE, GPR_CPU_CUSTOM
#endif
@ -239,11 +243,15 @@
#error Must define GPR_POSIX_SOCKET to use GPR_POSIX_MULTIPOLL_WITH_POLL
#endif
#if defined(GPR_POSIX_SOCKET) + defined(GPR_WINSOCK_SOCKET) + defined(GPR_CUSTOM_SOCKET) != 1
#if defined(GPR_POSIX_SOCKET) + defined(GPR_WINSOCK_SOCKET) + \
defined(GPR_CUSTOM_SOCKET) != \
1
#error Must define exactly one of GPR_POSIX_SOCKET, GPR_WINSOCK_SOCKET, GPR_CUSTOM_SOCKET
#endif
#if defined(GPR_MSVC_TLS) + defined(GPR_GCC_TLS) + defined(GPR_PTHREAD_TLS) + defined(GPR_CUSTOM_TLS) != 1
#if defined(GPR_MSVC_TLS) + defined(GPR_GCC_TLS) + defined(GPR_PTHREAD_TLS) + \
defined(GPR_CUSTOM_TLS) != \
1
#error Must define exactly one of GPR_MSVC_TLS, GPR_GCC_TLS, GPR_PTHREAD_TLS, GPR_CUSTOM_TLS
#endif
@ -266,4 +274,12 @@ typedef uintptr_t gpr_uintptr;
power of two */
#define GPR_MAX_ALIGNMENT 16
#endif /* GRPC_SUPPORT_PORT_PLATFORM_H */
#ifndef GRPC_MUST_USE_RESULT
#ifdef __GNUC__
#define GRPC_MUST_USE_RESULT __attribute__((warn_unused_result))
#else
#define GRPC_MUST_USE_RESULT
#endif
#endif
#endif /* GRPC_SUPPORT_PORT_PLATFORM_H */

@ -35,6 +35,7 @@
#define SRC_COMPILER_CONFIG_H
#include <grpc++/config.h>
#include <grpc++/config_protobuf.h>
#ifndef GRPC_CUSTOM_DESCRIPTOR
#include <google/protobuf/descriptor.h>
@ -48,7 +49,8 @@
#ifndef GRPC_CUSTOM_CODEGENERATOR
#include <google/protobuf/compiler/code_generator.h>
#define GRPC_CUSTOM_CODEGENERATOR ::google::protobuf::compiler::CodeGenerator
#define GRPC_CUSTOM_GENERATORCONTEXT ::google::protobuf::compiler::GeneratorContext
#define GRPC_CUSTOM_GENERATORCONTEXT \
::google::protobuf::compiler::GeneratorContext
#endif
#ifndef GRPC_CUSTOM_PRINTER
@ -57,7 +59,8 @@
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#define GRPC_CUSTOM_PRINTER ::google::protobuf::io::Printer
#define GRPC_CUSTOM_CODEDOUTPUTSTREAM ::google::protobuf::io::CodedOutputStream
#define GRPC_CUSTOM_STRINGOUTPUTSTREAM ::google::protobuf::io::StringOutputStream
#define GRPC_CUSTOM_STRINGOUTPUTSTREAM \
::google::protobuf::io::StringOutputStream
#endif
#ifndef GRPC_CUSTOM_PLUGINMAIN

@ -97,7 +97,8 @@ grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file,
vars["filename_base"] = grpc_generator::StripProto(file->name());
printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n");
printer.Print(vars, "// If you make any local change, they will be lost.\n");
printer.Print(vars,
"// If you make any local change, they will be lost.\n");
printer.Print(vars, "// source: $filename$\n");
printer.Print(vars, "#ifndef GRPC_$filename_identifier$__INCLUDED\n");
printer.Print(vars, "#define GRPC_$filename_identifier$__INCLUDED\n");
@ -113,6 +114,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
grpc::string temp =
"#include <grpc++/impl/internal_stub.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
"#include <grpc++/impl/proto_utils.h>\n"
"#include <grpc++/impl/service_type.h>\n"
"#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/status.h>\n"
@ -141,10 +143,10 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
return temp;
}
void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::MethodDescriptor *method,
std::map<grpc::string, grpc::string> *vars,
bool is_public) {
void PrintHeaderClientMethodInterfaces(
grpc::protobuf::io::Printer *printer,
const grpc::protobuf::MethodDescriptor *method,
std::map<grpc::string, grpc::string> *vars, bool is_public) {
(*vars)["Method"] = method->name();
(*vars)["Request"] =
grpc_cpp_generator::ClassName(method->input_type(), true);
@ -157,19 +159,17 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
*vars,
"virtual ::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) = 0;\n");
printer->Print(
*vars,
"std::unique_ptr< "
"::grpc::ClientAsyncResponseReaderInterface< $Response$>> "
"Async$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n");
printer->Print(*vars,
"std::unique_ptr< "
"::grpc::ClientAsyncResponseReaderInterface< $Response$>> "
"Async$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n");
printer->Indent();
printer->Print(
*vars,
"return std::unique_ptr< "
"::grpc::ClientAsyncResponseReaderInterface< $Response$>>("
"Async$Method$Raw(context, request, cq));\n");
printer->Print(*vars,
"return std::unique_ptr< "
"::grpc::ClientAsyncResponseReaderInterface< $Response$>>("
"Async$Method$Raw(context, request, cq));\n");
printer->Outdent();
printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) {
@ -188,14 +188,14 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>"
" Async$Method$(::grpc::ClientContext* context, $Response$* response, "
" Async$Method$(::grpc::ClientContext* context, $Response$* "
"response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
printer->Print(
*vars,
"return std::unique_ptr< "
"::grpc::ClientAsyncWriterInterface< $Request$>>("
"Async$Method$Raw(context, response, cq, tag));\n");
printer->Print(*vars,
"return std::unique_ptr< "
"::grpc::ClientAsyncWriterInterface< $Request$>>("
"Async$Method$Raw(context, response, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
} else if (ServerOnlyStreaming(method)) {
@ -218,18 +218,17 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
printer->Print(
*vars,
"return std::unique_ptr< "
"::grpc::ClientAsyncReaderInterface< $Response$>>("
"Async$Method$Raw(context, request, cq, tag));\n");
printer->Print(*vars,
"return std::unique_ptr< "
"::grpc::ClientAsyncReaderInterface< $Response$>>("
"Async$Method$Raw(context, request, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientReaderWriterInterface< $Request$, $Response$>> "
"$Method$(::grpc::ClientContext* context) {\n");
printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientReaderWriterInterface< "
"$Request$, $Response$>> "
"$Method$(::grpc::ClientContext* context) {\n");
printer->Indent();
printer->Print(
*vars,
@ -267,12 +266,11 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"virtual ::grpc::ClientWriterInterface< $Request$>*"
" $Method$Raw("
"::grpc::ClientContext* context, $Response$* response) = 0;\n");
printer->Print(
*vars,
"virtual ::grpc::ClientAsyncWriterInterface< $Request$>*"
" Async$Method$Raw(::grpc::ClientContext* context, "
"$Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n");
printer->Print(*vars,
"virtual ::grpc::ClientAsyncWriterInterface< $Request$>*"
" Async$Method$Raw(::grpc::ClientContext* context, "
"$Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
@ -285,16 +283,15 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
"virtual ::grpc::ClientReaderWriterInterface< $Request$, $Response$>* "
"$Method$Raw(::grpc::ClientContext* context) = 0;\n");
printer->Print(
*vars,
"virtual ::grpc::ClientAsyncReaderWriterInterface< "
"$Request$, $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n");
printer->Print(*vars,
"virtual ::grpc::ClientReaderWriterInterface< $Request$, "
"$Response$>* "
"$Method$Raw(::grpc::ClientContext* context) = 0;\n");
printer->Print(*vars,
"virtual ::grpc::ClientAsyncReaderWriterInterface< "
"$Request$, $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n");
}
}
}
@ -321,11 +318,10 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n");
printer->Indent();
printer->Print(
*vars,
"return std::unique_ptr< "
"::grpc::ClientAsyncResponseReader< $Response$>>("
"Async$Method$Raw(context, request, cq));\n");
printer->Print(*vars,
"return std::unique_ptr< "
"::grpc::ClientAsyncResponseReader< $Response$>>("
"Async$Method$Raw(context, request, cq));\n");
printer->Outdent();
printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) {
@ -335,17 +331,16 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
" $Method$("
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Indent();
printer->Print(
*vars,
"return std::unique_ptr< ::grpc::ClientWriter< $Request$>>"
"($Method$Raw(context, response));\n");
printer->Print(*vars,
"return std::unique_ptr< ::grpc::ClientWriter< $Request$>>"
"($Method$Raw(context, response));\n");
printer->Outdent();
printer->Print("}\n");
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>"
" Async$Method$(::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>"
" Async$Method$(::grpc::ClientContext* context, "
"$Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
printer->Print(
*vars,
@ -385,53 +380,47 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>>"
" $Method$(::grpc::ClientContext* context) {\n");
printer->Indent();
printer->Print(
*vars,
"return std::unique_ptr< "
"::grpc::ClientReaderWriter< $Request$, $Response$>>("
"$Method$Raw(context));\n");
printer->Print(*vars,
"return std::unique_ptr< "
"::grpc::ClientReaderWriter< $Request$, $Response$>>("
"$Method$Raw(context));\n");
printer->Outdent();
printer->Print("}\n");
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
"$Request$, $Response$>> "
"Async$Method$(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
"$Request$, $Response$>> "
"Async$Method$(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
printer->Print(
*vars,
"return std::unique_ptr< "
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>("
"Async$Method$Raw(context, cq, tag));\n");
printer->Print(*vars,
"return std::unique_ptr< "
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>("
"Async$Method$Raw(context, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
}
} else {
if (NoStreaming(method)) {
printer->Print(
*vars,
"::grpc::ClientAsyncResponseReader< $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n");
printer->Print(*vars,
"::grpc::ClientAsyncResponseReader< $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
"::grpc::ClientWriter< $Request$>* $Method$Raw("
"::grpc::ClientContext* context, $Response$* response) "
"GRPC_OVERRIDE;\n");
printer->Print(*vars,
"::grpc::ClientWriter< $Request$>* $Method$Raw("
"::grpc::ClientContext* context, $Response$* response) "
"GRPC_OVERRIDE;\n");
printer->Print(
*vars,
"::grpc::ClientAsyncWriter< $Request$>* Async$Method$Raw("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) GRPC_OVERRIDE;\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
"::grpc::ClientReader< $Response$>* $Method$Raw("
"::grpc::ClientContext* context, const $Request$& request)"
" GRPC_OVERRIDE;\n");
printer->Print(*vars,
"::grpc::ClientReader< $Response$>* $Method$Raw("
"::grpc::ClientContext* context, const $Request$& request)"
" GRPC_OVERRIDE;\n");
printer->Print(
*vars,
"::grpc::ClientAsyncReader< $Response$>* Async$Method$Raw("
@ -629,7 +618,7 @@ grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string output;
{
// Scope the output stream so it closes and finalizes output to the string.
// Scope the output stream so it closes and finalizes output to the string.
grpc::protobuf::io::StringOutputStream output_stream(&output);
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
@ -693,7 +682,8 @@ grpc::string GetSourcePrologue(const grpc::protobuf::FileDescriptor *file,
vars["filename_base"] = grpc_generator::StripProto(file->name());
printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n");
printer.Print(vars, "// If you make any local change, they will be lost.\n");
printer.Print(vars,
"// If you make any local change, they will be lost.\n");
printer.Print(vars, "// source: $filename$\n\n");
printer.Print(vars, "#include \"$filename_base$.pb.h\"\n");
printer.Print(vars, "#include \"$filename_base$.grpc.pb.h\"\n");
@ -1056,8 +1046,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, "
"$Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
@ -1066,8 +1055,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
@ -1076,8 +1064,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
@ -1086,8 +1073,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
}
printer->Print("return service_;\n");

@ -41,7 +41,6 @@
#include <grpc/support/slice.h>
#include "src/core/profiling/timers.h"
#include "src/cpp/proto/proto_utils.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@ -75,14 +74,14 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
return Call(c_call, this, cq);
}
void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = MAX_OPS;
grpc_op ops[MAX_OPS];
size_t nops = 0;
grpc_op cops[MAX_OPS];
GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
buf->FillOps(ops, &nops);
ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), ops, nops, buf));
grpc_call_start_batch(call->call(), cops, nops, ops));
GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
}

@ -44,22 +44,22 @@ struct grpc_channel;
namespace grpc {
class Call;
class CallOpBuffer;
class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class Credentials;
class StreamContextInterface;
class Channel GRPC_FINAL : public GrpcLibrary,
public ChannelInterface {
class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
public:
Channel(const grpc::string& target, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE;
virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE;
virtual void* RegisterMethod(const char* method) GRPC_OVERRIDE;
virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
virtual void PerformOpsOnCall(CallOpSetInterface* ops,
Call* call) GRPC_OVERRIDE;
private:
const grpc::string target_;

@ -39,107 +39,32 @@
#include <grpc++/channel_interface.h>
#include "src/core/profiling/timers.h"
#include "src/cpp/proto/proto_utils.h"
namespace grpc {
CallOpBuffer::CallOpBuffer()
: return_tag_(this),
send_initial_metadata_(false),
initial_metadata_count_(0),
initial_metadata_(nullptr),
recv_initial_metadata_(nullptr),
send_message_(nullptr),
send_message_buffer_(nullptr),
send_buf_(nullptr),
recv_message_(nullptr),
recv_message_buffer_(nullptr),
recv_buf_(nullptr),
max_message_size_(-1),
client_send_close_(false),
recv_trailing_metadata_(nullptr),
recv_status_(nullptr),
status_code_(GRPC_STATUS_OK),
status_details_(nullptr),
status_details_capacity_(0),
send_status_available_(false),
send_status_code_(GRPC_STATUS_OK),
trailing_metadata_count_(0),
trailing_metadata_(nullptr),
cancelled_buf_(0),
recv_closed_(nullptr) {
memset(&recv_trailing_metadata_arr_, 0, sizeof(recv_trailing_metadata_arr_));
memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
recv_trailing_metadata_arr_.metadata = nullptr;
recv_initial_metadata_arr_.metadata = nullptr;
}
void CallOpBuffer::Reset(void* next_return_tag) {
return_tag_ = next_return_tag;
send_initial_metadata_ = false;
initial_metadata_count_ = 0;
gpr_free(initial_metadata_);
recv_initial_metadata_ = nullptr;
recv_initial_metadata_arr_.count = 0;
if (send_buf_ && send_message_) {
grpc_byte_buffer_destroy(send_buf_);
}
send_message_ = nullptr;
send_message_buffer_ = nullptr;
send_buf_ = nullptr;
got_message = false;
if (recv_buf_ && recv_message_) {
grpc_byte_buffer_destroy(recv_buf_);
}
recv_message_ = nullptr;
recv_message_buffer_ = nullptr;
recv_buf_ = nullptr;
client_send_close_ = false;
recv_trailing_metadata_ = nullptr;
recv_status_ = nullptr;
recv_trailing_metadata_arr_.count = 0;
status_code_ = GRPC_STATUS_OK;
send_status_available_ = false;
send_status_code_ = GRPC_STATUS_OK;
send_status_details_.clear();
trailing_metadata_count_ = 0;
trailing_metadata_ = nullptr;
recv_closed_ = nullptr;
}
CallOpBuffer::~CallOpBuffer() {
gpr_free(status_details_);
gpr_free(recv_initial_metadata_arr_.metadata);
gpr_free(recv_trailing_metadata_arr_.metadata);
if (recv_buf_ && recv_message_) {
grpc_byte_buffer_destroy(recv_buf_);
}
if (send_buf_ && send_message_) {
grpc_byte_buffer_destroy(send_buf_);
void FillMetadataMap(grpc_metadata_array* arr,
std::multimap<grpc::string, grpc::string>* metadata) {
for (size_t i = 0; i < arr->count; i++) {
// TODO(yangg) handle duplicates?
metadata->insert(std::pair<grpc::string, grpc::string>(
arr->metadata[i].key,
grpc::string(arr->metadata[i].value, arr->metadata[i].value_length)));
}
grpc_metadata_array_destroy(arr);
grpc_metadata_array_init(arr);
}
namespace {
// TODO(yangg) if the map is changed before we send, the pointers will be a
// mess. Make sure it does not happen.
grpc_metadata* FillMetadataArray(
std::multimap<grpc::string, grpc::string>* metadata) {
if (metadata->empty()) {
const std::multimap<grpc::string, grpc::string>& metadata) {
if (metadata.empty()) {
return nullptr;
}
grpc_metadata* metadata_array =
(grpc_metadata*)gpr_malloc(metadata->size() * sizeof(grpc_metadata));
(grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata));
size_t i = 0;
for (auto iter = metadata->cbegin(); iter != metadata->cend(); ++iter, ++i) {
for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
metadata_array[i].key = iter->first.c_str();
metadata_array[i].value = iter->second.c_str();
metadata_array[i].value_length = iter->second.size();
@ -147,206 +72,6 @@ grpc_metadata* FillMetadataArray(
return metadata_array;
}
void FillMetadataMap(grpc_metadata_array* arr,
std::multimap<grpc::string, grpc::string>* metadata) {
for (size_t i = 0; i < arr->count; i++) {
// TODO(yangg) handle duplicates?
metadata->insert(std::pair<grpc::string, grpc::string>(
arr->metadata[i].key,
grpc::string(arr->metadata[i].value, arr->metadata[i].value_length)));
}
grpc_metadata_array_destroy(arr);
grpc_metadata_array_init(arr);
}
} // namespace
void CallOpBuffer::AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string>* metadata) {
send_initial_metadata_ = true;
initial_metadata_count_ = metadata->size();
initial_metadata_ = FillMetadataArray(metadata);
}
void CallOpBuffer::AddRecvInitialMetadata(ClientContext* ctx) {
ctx->initial_metadata_received_ = true;
recv_initial_metadata_ = &ctx->recv_initial_metadata_;
}
void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) {
AddSendInitialMetadata(&ctx->send_initial_metadata_);
}
void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) {
send_message_ = &message;
}
void CallOpBuffer::AddSendMessage(const ByteBuffer& message) {
send_message_buffer_ = &message;
}
void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) {
recv_message_ = message;
recv_message_->Clear();
}
void CallOpBuffer::AddRecvMessage(ByteBuffer* message) {
recv_message_buffer_ = message;
recv_message_buffer_->Clear();
}
void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; }
void CallOpBuffer::AddServerRecvClose(bool* cancelled) {
recv_closed_ = cancelled;
}
void CallOpBuffer::AddClientRecvStatus(ClientContext* context, Status* status) {
recv_trailing_metadata_ = &context->trailing_metadata_;
recv_status_ = status;
}
void CallOpBuffer::AddServerSendStatus(
std::multimap<grpc::string, grpc::string>* metadata, const Status& status) {
if (metadata != NULL) {
trailing_metadata_count_ = metadata->size();
trailing_metadata_ = FillMetadataArray(metadata);
} else {
trailing_metadata_count_ = 0;
}
send_status_available_ = true;
send_status_code_ = static_cast<grpc_status_code>(status.error_code());
send_status_details_ = status.error_message();
}
void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
*nops = 0;
if (send_initial_metadata_) {
ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[*nops].data.send_initial_metadata.count = initial_metadata_count_;
ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
ops[*nops].flags = 0;
(*nops)++;
}
if (recv_initial_metadata_) {
ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
ops[*nops].flags = 0;
(*nops)++;
}
if (send_message_ || send_message_buffer_) {
if (send_message_) {
GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_SERIALIZE, 0);
bool success = SerializeProto(*send_message_, &send_buf_);
if (!success) {
abort();
// TODO handle parse failure
}
GRPC_TIMER_END(GRPC_PTAG_PROTO_SERIALIZE, 0);
} else {
send_buf_ = send_message_buffer_->buffer();
}
ops[*nops].op = GRPC_OP_SEND_MESSAGE;
ops[*nops].data.send_message = send_buf_;
ops[*nops].flags = 0;
(*nops)++;
}
if (recv_message_ || recv_message_buffer_) {
ops[*nops].op = GRPC_OP_RECV_MESSAGE;
ops[*nops].data.recv_message = &recv_buf_;
ops[*nops].flags = 0;
(*nops)++;
}
if (client_send_close_) {
ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[*nops].flags = 0;
(*nops)++;
}
if (recv_status_) {
ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[*nops].data.recv_status_on_client.trailing_metadata =
&recv_trailing_metadata_arr_;
ops[*nops].data.recv_status_on_client.status = &status_code_;
ops[*nops].data.recv_status_on_client.status_details = &status_details_;
ops[*nops].data.recv_status_on_client.status_details_capacity =
&status_details_capacity_;
ops[*nops].flags = 0;
(*nops)++;
}
if (send_status_available_) {
ops[*nops].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[*nops].data.send_status_from_server.trailing_metadata_count =
trailing_metadata_count_;
ops[*nops].data.send_status_from_server.trailing_metadata =
trailing_metadata_;
ops[*nops].data.send_status_from_server.status = send_status_code_;
ops[*nops].data.send_status_from_server.status_details =
send_status_details_.empty() ? nullptr : send_status_details_.c_str();
ops[*nops].flags = 0;
(*nops)++;
}
if (recv_closed_) {
ops[*nops].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops[*nops].data.recv_close_on_server.cancelled = &cancelled_buf_;
ops[*nops].flags = 0;
(*nops)++;
}
}
bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
// Release send buffers.
if (send_buf_ && send_message_) {
if (send_message_) {
grpc_byte_buffer_destroy(send_buf_);
}
send_buf_ = nullptr;
}
if (initial_metadata_) {
gpr_free(initial_metadata_);
initial_metadata_ = nullptr;
}
if (trailing_metadata_count_) {
gpr_free(trailing_metadata_);
trailing_metadata_ = nullptr;
}
// Set user-facing tag.
*tag = return_tag_;
// Process received initial metadata
if (recv_initial_metadata_) {
FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
}
// Parse received message if any.
if (recv_message_ || recv_message_buffer_) {
if (recv_buf_) {
got_message = *status;
if (recv_message_) {
GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, 0);
*status = *status &&
DeserializeProto(recv_buf_, recv_message_, max_message_size_);
grpc_byte_buffer_destroy(recv_buf_);
GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, 0);
} else {
recv_message_buffer_->set_buffer(recv_buf_);
}
recv_buf_ = nullptr;
} else {
// Read failed
got_message = false;
*status = false;
}
}
// Parse received status.
if (recv_status_) {
FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
*recv_status_ = Status(
static_cast<StatusCode>(status_code_),
status_details_ ? grpc::string(status_details_) : grpc::string());
}
if (recv_closed_) {
*recv_closed_ = cancelled_buf_ != 0;
}
return true;
}
Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
: call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {}
@ -357,11 +82,11 @@ Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
call_(call),
max_message_size_(max_message_size) {}
void Call::PerformOps(CallOpBuffer* buffer) {
void Call::PerformOps(CallOpSetInterface* ops) {
if (max_message_size_ > 0) {
buffer->set_max_message_size(max_message_size_);
ops->set_max_message_size(max_message_size_);
}
call_hook_->PerformOpsOnCall(buffer, this);
call_hook_->PerformOpsOnCall(ops, this);
}
} // namespace grpc

@ -31,7 +31,7 @@
*
*/
#include "src/cpp/proto/proto_utils.h"
#include <grpc++/impl/proto_utils.h>
#include <grpc++/config.h>
#include <grpc/grpc.h>
@ -67,7 +67,7 @@ class GrpcBufferWriter GRPC_FINAL
slice_ = gpr_slice_malloc(block_size_);
}
*data = GPR_SLICE_START_PTR(slice_);
byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
byte_count_ += * size = GPR_SLICE_LENGTH(slice_);
gpr_slice_buffer_add(slice_buffer_, slice_);
return true;
}
@ -118,7 +118,7 @@ class GrpcBufferReader GRPC_FINAL
}
gpr_slice_unref(slice_);
*data = GPR_SLICE_START_PTR(slice_);
byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
byte_count_ += * size = GPR_SLICE_LENGTH(slice_);
return true;
}
@ -152,20 +152,28 @@ class GrpcBufferReader GRPC_FINAL
namespace grpc {
bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
GrpcBufferWriter writer(bp);
return msg.SerializeToZeroCopyStream(&writer);
return msg.SerializeToZeroCopyStream(&writer) ? Status::OK : Status(INVALID_ARGUMENT, "Failed to serialize message");
}
bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size) {
if (!buffer) return false;
Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size) {
if (!buffer) {
return Status(INVALID_ARGUMENT, "No payload");
}
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage();
if (!msg->ParseFromCodedStream(&decoder)) {
return Status(INVALID_ARGUMENT, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
return Status(INVALID_ARGUMENT, "Did not read entire message");
}
return Status::OK;
}
} // namespace grpc

@ -48,7 +48,6 @@
#include <grpc++/time.h>
#include "src/core/profiling/timers.h"
#include "src/cpp/proto/proto_utils.h"
namespace grpc {
@ -69,16 +68,11 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::SERVER_STREAMING),
has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::CLIENT_STREAMING),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
}
~SyncRequest() {
grpc_metadata_array_destroy(&request_metadata_);
}
~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); }
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
@ -91,9 +85,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
void SetupRequest() {
cq_ = grpc_completion_queue_create();
}
void SetupRequest() { cq_ = grpc_completion_queue_create(); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
@ -125,7 +117,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
has_response_payload_(mrd->has_response_payload_),
request_payload_(mrd->request_payload_),
method_(mrd->method_) {
ctx_.call_ = mrd->call_;
@ -142,35 +133,10 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
void Run() {
std::unique_ptr<grpc::protobuf::Message> req;
std::unique_ptr<grpc::protobuf::Message> res;
if (has_request_payload_) {
GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
req.reset(method_->AllocateRequestProto());
if (!DeserializeProto(request_payload_, req.get(),
call_.max_message_size())) {
// FIXME(yangg) deal with deserialization failure
cq_.Shutdown();
return;
}
GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
}
if (has_response_payload_) {
res.reset(method_->AllocateResponseProto());
}
ctx_.BeginCompletionOp(&call_);
auto status = method_->handler()->RunHandler(
MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
CallOpBuffer buf;
if (!ctx_.sent_initial_metadata_) {
buf.AddSendInitialMetadata(&ctx_.initial_metadata_);
}
if (has_response_payload_) {
buf.AddSendMessage(*res);
}
buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
call_.PerformOps(&buf);
cq_.Pluck(&buf); /* status ignored */
method_->handler()->RunHandler(MethodHandler::HandlerParameter(
&call_, &ctx_, request_payload_, call_.max_message_size()));
request_payload_ = nullptr;
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
@ -182,7 +148,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
Call call_;
ServerContext ctx_;
const bool has_request_payload_;
const bool has_response_payload_;
grpc_byte_buffer* request_payload_;
RpcServiceMethod* const method_;
};
@ -192,7 +157,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
const bool has_response_payload_;
grpc_call* call_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
@ -260,9 +224,9 @@ bool Server::RegisterService(RpcService* service) {
}
bool Server::RegisterAsyncService(AsynchronousService* service) {
GPR_ASSERT(service->dispatch_impl_ == nullptr &&
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->dispatch_impl_ = this;
service->server_ = this;
service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag = grpc_server_register_method(server_, service->method_names_[i],
@ -328,141 +292,87 @@ void Server::Wait() {
}
}
void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = MAX_OPS;
grpc_op ops[MAX_OPS];
buf->FillOps(ops, &nops);
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), ops, nops, buf));
grpc_call_start_batch(call->call(), cops, nops, ops));
}
class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: tag_(tag),
request_(request),
stream_(stream),
call_cq_(call_cq),
ctx_(ctx),
generic_ctx_(nullptr),
server_(server),
call_(nullptr),
payload_(nullptr) {
memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
grpc_server_request_registered_call(
server->server_, registered_method, &call_, &call_details_.deadline,
&array_, request ? &payload_ : nullptr, call_cq->cq(),
notification_cq->cq(), this);
}
Server::BaseAsyncRequest::BaseAsyncRequest(
Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
: server_(server),
context_(context),
stream_(stream),
call_cq_(call_cq),
tag_(tag),
call_(nullptr) {
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
AsyncRequest(Server* server, GenericServerContext* ctx,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: tag_(tag),
request_(nullptr),
stream_(stream),
call_cq_(call_cq),
ctx_(nullptr),
generic_ctx_(ctx),
server_(server),
call_(nullptr),
payload_(nullptr) {
memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
call_cq->cq(), notification_cq->cq(), this);
}
Server::BaseAsyncRequest::~BaseAsyncRequest() {}
~AsyncRequest() {
if (payload_) {
grpc_byte_buffer_destroy(payload_);
bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
if (*status) {
for (size_t i = 0; i < initial_metadata_array_.count; i++) {
context_->client_metadata_.insert(std::make_pair(
grpc::string(initial_metadata_array_.metadata[i].key),
grpc::string(initial_metadata_array_.metadata[i].value,
initial_metadata_array_.metadata[i].value +
initial_metadata_array_.metadata[i].value_length)));
}
grpc_metadata_array_destroy(&array_);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
*tag = tag_;
bool orig_status = *status;
if (*status && request_) {
if (payload_) {
GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_);
*status =
DeserializeProto(payload_, request_, server_->max_message_size_);
GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_);
} else {
*status = false;
}
}
ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_;
GPR_ASSERT(ctx);
if (*status) {
ctx->deadline_ = call_details_.deadline;
for (size_t i = 0; i < array_.count; i++) {
ctx->client_metadata_.insert(std::make_pair(
grpc::string(array_.metadata[i].key),
grpc::string(
array_.metadata[i].value,
array_.metadata[i].value + array_.metadata[i].value_length)));
}
if (generic_ctx_) {
// TODO(yangg) remove the copy here.
generic_ctx_->method_ = call_details_.method;
generic_ctx_->host_ = call_details_.host;
gpr_free(call_details_.method);
gpr_free(call_details_.host);
}
}
ctx->call_ = call_;
ctx->cq_ = call_cq_;
Call call(call_, server_, call_cq_, server_->max_message_size_);
if (orig_status && call_) {
ctx->BeginCompletionOp(&call);
}
// just the pointers inside call are copied here
stream_->BindCall(&call);
delete this;
return true;
grpc_metadata_array_destroy(&initial_metadata_array_);
context_->call_ = call_;
context_->cq_ = call_cq_;
Call call(call_, server_, call_cq_, server_->max_message_size_);
if (*status && call_) {
context_->BeginCompletionOp(&call);
}
// just the pointers inside call are copied here
stream_->BindCall(&call);
*tag = tag_;
delete this;
return true;
}
private:
void* const tag_;
grpc::protobuf::Message* const request_;
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
ServerContext* const ctx_;
GenericServerContext* const generic_ctx_;
Server* const server_;
grpc_call* call_;
grpc_call_details call_details_;
grpc_metadata_array array_;
grpc_byte_buffer* payload_;
};
Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag) {}
void Server::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq) {
grpc_server_request_registered_call(
server_->server_, registered_method, &call_, &context_->deadline_,
&initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
this);
}
void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
new AsyncRequest(this, registered_method, context, request, stream, call_cq,
notification_cq, tag);
Server::GenericAsyncRequest::GenericAsyncRequest(
Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag) {
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
grpc_server_request_call(server->server_, &call_, &call_details_,
&initial_metadata_array_, call_cq->cq(),
notification_cq->cq(), this);
}
void Server::RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
// TODO(yangg) remove the copy here.
static_cast<GenericServerContext*>(context_)->method_ = call_details_.method;
static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
gpr_free(call_details_.method);
gpr_free(call_details_.host);
return BaseAsyncRequest::FinalizeResult(tag, status);
}
void Server::ScheduleCallback() {

@ -43,12 +43,12 @@ namespace grpc {
// CompletionOp
class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
CompletionOp() : refs_(2), finalized_(false), cancelled_(false) {
AddServerRecvClose(&cancelled_);
}
CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool CheckCancelled(CompletionQueue* cq);
@ -59,7 +59,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
grpc::mutex mu_;
int refs_;
bool finalized_;
bool cancelled_;
int cancelled_;
};
void ServerContext::CompletionOp::Unref() {
@ -73,14 +73,20 @@ void ServerContext::CompletionOp::Unref() {
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
grpc::lock_guard<grpc::mutex> g(mu_);
return finalized_ ? cancelled_ : false;
return finalized_ ? cancelled_ != 0 : false;
}
void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops->data.recv_close_on_server.cancelled = &cancelled_;
ops->flags = 0;
*nops = 1;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
if (!*status) cancelled_ = true;
if (!*status) cancelled_ = 1;
if (--refs_ == 0) {
lock.unlock();
delete this;

@ -33,10 +33,10 @@
#include <memory>
#include "src/cpp/proto/proto_utils.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/echo.grpc.pb.h"
#include <grpc++/impl/proto_utils.h>
#include <grpc++/async_generic_service.h>
#include <grpc++/async_unary_call.h>
#include <grpc++/byte_buffer.h>

@ -760,7 +760,7 @@ WARN_LOGFILE =
# spaces.
# Note: If this tag is empty the current directory is searched.
INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h
INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/serialization_traits.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

@ -760,7 +760,7 @@ WARN_LOGFILE =
# spaces.
# Note: If this tag is empty the current directory is searched.
INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h src/cpp/client/secure_credentials.h src/cpp/server/secure_server_credentials.h src/cpp/client/channel.h src/cpp/proto/proto_utils.h src/cpp/server/thread_pool.h src/cpp/client/secure_credentials.cc src/cpp/server/secure_server_credentials.cc src/cpp/client/channel.cc src/cpp/client/channel_arguments.cc src/cpp/client/client_context.cc src/cpp/client/client_unary_call.cc src/cpp/client/create_channel.cc src/cpp/client/credentials.cc src/cpp/client/generic_stub.cc src/cpp/client/insecure_credentials.cc src/cpp/client/internal_stub.cc src/cpp/common/call.cc src/cpp/common/completion_queue.cc src/cpp/common/rpc_method.cc src/cpp/proto/proto_utils.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/insecure_server_credentials.cc src/cpp/server/server.cc src/cpp/server/server_builder.cc src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/thread_pool.cc src/cpp/util/byte_buffer.cc src/cpp/util/slice.cc src/cpp/util/status.cc src/cpp/util/time.cc
INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/serialization_traits.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h src/cpp/client/secure_credentials.h src/cpp/server/secure_server_credentials.h src/cpp/client/channel.h src/cpp/proto/proto_utils.h src/cpp/server/thread_pool.h src/cpp/client/secure_credentials.cc src/cpp/server/secure_server_credentials.cc src/cpp/client/channel.cc src/cpp/client/channel_arguments.cc src/cpp/client/client_context.cc src/cpp/client/create_channel.cc src/cpp/client/credentials.cc src/cpp/client/generic_stub.cc src/cpp/client/insecure_credentials.cc src/cpp/client/internal_stub.cc src/cpp/common/call.cc src/cpp/common/completion_queue.cc src/cpp/common/rpc_method.cc src/cpp/proto/proto_utils.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/insecure_server_credentials.cc src/cpp/server/server.cc src/cpp/server/server_builder.cc src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/thread_pool.cc src/cpp/util/byte_buffer.cc src/cpp/util/slice.cc src/cpp/util/status.cc src/cpp/util/time.cc
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

@ -163,6 +163,7 @@
<ClInclude Include="..\..\include\grpc++\impl\internal_stub.h" />
<ClInclude Include="..\..\include\grpc++\impl\rpc_method.h" />
<ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h" />
<ClInclude Include="..\..\include\grpc++\impl\serialization_traits.h" />
<ClInclude Include="..\..\include\grpc++\impl\service_type.h" />
<ClInclude Include="..\..\include\grpc++\impl\sync.h" />
<ClInclude Include="..\..\include\grpc++\impl\sync_cxx11.h" />
@ -199,8 +200,6 @@
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\credentials.cc">

@ -16,9 +16,6 @@
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
@ -135,6 +132,9 @@
<ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\impl\serialization_traits.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\impl\service_type.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>

@ -163,6 +163,7 @@
<ClInclude Include="..\..\include\grpc++\impl\internal_stub.h" />
<ClInclude Include="..\..\include\grpc++\impl\rpc_method.h" />
<ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h" />
<ClInclude Include="..\..\include\grpc++\impl\serialization_traits.h" />
<ClInclude Include="..\..\include\grpc++\impl\service_type.h" />
<ClInclude Include="..\..\include\grpc++\impl\sync.h" />
<ClInclude Include="..\..\include\grpc++\impl\sync_cxx11.h" />
@ -193,8 +194,6 @@
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\credentials.cc">

@ -10,9 +10,6 @@
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
@ -129,6 +126,9 @@
<ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\impl\serialization_traits.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\impl\service_type.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>

Loading…
Cancel
Save