Progress commit on fixing up C++

pull/1969/head
Craig Tiller 10 years ago
parent a6ab47c75b
commit 50a7a68ca2
  1. 2
      BUILD
  2. 2
      Makefile
  3. 1
      build.json
  4. 15
      include/grpc++/async_unary_call.h
  5. 17
      include/grpc++/byte_buffer.h
  6. 6
      include/grpc++/client_context.h
  7. 17
      include/grpc++/completion_queue.h
  8. 2
      include/grpc++/config_protobuf.h
  9. 109
      include/grpc++/impl/call.h
  10. 12
      include/grpc++/impl/client_unary_call.h
  11. 21
      include/grpc++/impl/proto_utils.h
  12. 101
      include/grpc++/impl/rpc_service_method.h
  13. 5
      include/grpc++/impl/serialization_traits.h
  14. 42
      include/grpc++/server.h
  15. 16
      include/grpc++/server_context.h
  16. 60
      include/grpc++/stream.h
  17. 13
      src/compiler/cpp_generator.cc
  18. 11
      src/cpp/client/channel.cc
  19. 4
      src/cpp/client/channel.h
  20. 9
      src/cpp/common/call.cc
  21. 16
      src/cpp/proto/proto_utils.cc
  22. 59
      src/cpp/server/server.cc
  23. 21
      src/cpp/server/server_context.cc
  24. 2
      tools/doxygen/Doxyfile.c++.internal
  25. 2
      vsprojects/grpc++/grpc++.vcxproj
  26. 3
      vsprojects/grpc++/grpc++.vcxproj.filters
  27. 2
      vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
  28. 3
      vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters

@ -576,7 +576,6 @@ cc_library(
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",
@ -656,7 +655,6 @@ cc_library(
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",

@ -3325,7 +3325,6 @@ LIBGRPC++_SRC = \
src/cpp/client/channel.cc \
src/cpp/client/channel_arguments.cc \
src/cpp/client/client_context.cc \
src/cpp/client/client_unary_call.cc \
src/cpp/client/create_channel.cc \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \
@ -3613,7 +3612,6 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/channel.cc \
src/cpp/client/channel_arguments.cc \
src/cpp/client/client_context.cc \
src/cpp/client/client_unary_call.cc \
src/cpp/client/create_channel.cc \
src/cpp/client/credentials.cc \
src/cpp/client/generic_stub.cc \

@ -72,7 +72,6 @@
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
"src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/generic_stub.cc",

@ -51,7 +51,6 @@ class ClientAsyncResponseReaderInterface {
virtual ~ClientAsyncResponseReaderInterface() {}
virtual void ReadInitialMetadata(void* tag) = 0;
virtual void Finish(R* msg, Status* status, void* tag) = 0;
};
template <class R>
@ -72,15 +71,15 @@ class ClientAsyncResponseReader GRPC_FINAL
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.SetOutputTag(tag);
meta_buf_.set_output_tag(tag);
meta_buf_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
finish_buf_.SetOutputTag(tag);
finish_buf_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
finish_buf_.RecvInitialMetadata(context_);
}
finish_buf_.RecvMessage(msg);
finish_buf_.ClientRecvStatus(context_, status);
@ -92,7 +91,7 @@ class ClientAsyncResponseReader GRPC_FINAL
Call call_;
SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_buf_;
CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
CallOpSet<CallOpRecvMessage<R>, CallOpClientRecvStatus> finish_buf_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, CallOpClientRecvStatus> finish_buf_;
};
template <class W>
@ -105,14 +104,14 @@ class ServerAsyncResponseWriter GRPC_FINAL
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.SetOutputTag(tag);
meta_buf_.set_output_tag(tag);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.SetOutputTag(tag);
finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@ -127,7 +126,7 @@ class ServerAsyncResponseWriter GRPC_FINAL
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
finish_buf_.SetOutputTag(tag);
finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;

@ -38,6 +38,8 @@
#include <grpc/support/log.h>
#include <grpc++/config.h>
#include <grpc++/slice.h>
#include <grpc++/status.h>
#include <grpc++/impl/serialization_traits.h>
#include <vector>
@ -60,9 +62,6 @@ class ByteBuffer GRPC_FINAL {
void Clear();
size_t Length();
private:
friend class CallOpBuffer;
// takes ownership
void set_buffer(grpc_byte_buffer* buf) {
if (buffer_) {
@ -72,11 +71,23 @@ class ByteBuffer GRPC_FINAL {
buffer_ = buf;
}
private:
friend class CallOpBuffer;
grpc_byte_buffer* buffer() const { return buffer_; }
grpc_byte_buffer* buffer_;
};
template <>
class SerializationTraits<ByteBuffer, void> {
public:
static Status Deserialize(grpc_byte_buffer* byte_buffer, ByteBuffer* dest, int max_message_size) {
dest->set_buffer(byte_buffer);
return Status::OK;
}
};
} // namespace grpc
#endif // GRPCXX_BYTE_BUFFER_H

@ -131,6 +131,12 @@ class ClientContext {
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel,
const RpcMethod& method,
ClientContext* context,
const InputMessage& request,
OutputMessage* result);
grpc_call* call() { return call_; }
void set_call(grpc_call* call,

@ -54,6 +54,14 @@ template <class W>
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class ChannelInterface;
class ClientContext;
@ -62,6 +70,7 @@ class RpcMethod;
class Server;
class ServerBuilder;
class ServerContext;
class Status;
class CompletionQueueTag {
public:
@ -120,6 +129,14 @@ class CompletionQueue : public GrpcLibrary {
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
template <class ServiceType, class RequestType, class ResponseType>
friend class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage>

@ -34,8 +34,6 @@
#ifndef GRPCXX_CONFIG_PROTOBUF_H
#define GRPCXX_CONFIG_PROTOBUF_H
#include <grpc++/impl/serialization_traits.h>
#ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64

@ -38,6 +38,7 @@
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/status.h>
#include <grpc++/impl/serialization_traits.h>
#include <memory>
#include <map>
@ -53,7 +54,7 @@ class Call;
class CallNoOp {
protected:
void AddOp(grpc_op* ops, size_t* nops) {}
void FinishOp(void* tag, bool* status) {}
void FinishOp(void* tag, bool* status, int max_message_size) {}
};
class CallOpSendInitialMetadata {
@ -62,24 +63,71 @@ class CallOpSendInitialMetadata {
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpSendMessage {
public:
CallOpSendMessage() : send_buf_(nullptr) {}
template <class M>
void SendMessage(const M& message);
bool SendMessage(const M& message) {
return SerializationTraits<M>::Serialize(message, &send_buf_);
}
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
void AddOp(grpc_op* ops, size_t* nops) {
if (send_buf_ == nullptr) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = send_buf_;
}
void FinishOp(void* tag, bool* status, int max_message_size) {
grpc_byte_buffer_destroy(send_buf_);
}
private:
grpc_byte_buffer* send_buf_;
};
template <class M>
template <class R>
class CallOpRecvMessage {
public:
CallOpRecvMessage() : got_message(false), message_(nullptr) {}
void RecvMessage(R* message) {
message_ = message;
}
bool got_message;
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
void AddOp(grpc_op* ops, size_t* nops) {
if (message_ == nullptr) return;
grpc_op *op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &recv_buf_;
}
void FinishOp(void* tag, bool* status, int max_message_size) {
if (message_ == nullptr) return;
if (recv_buf_) {
if (*status) {
got_message = true;
*status = SerializationTraits<R>::Deserialize(recv_buf_, message_, max_message_size).IsOk();
} else {
got_message = false;
grpc_byte_buffer_destroy(recv_buf_);
}
} else {
got_message = false;
*status = false;
}
}
private:
R* message_;
grpc_byte_buffer* recv_buf_;
};
class CallOpGenericRecvMessage {
@ -87,9 +135,11 @@ class CallOpGenericRecvMessage {
template <class R>
void RecvMessage(R* message);
bool got_message;
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpClientSendClose {
@ -98,7 +148,7 @@ class CallOpClientSendClose {
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpServerSendStatus {
@ -107,7 +157,7 @@ class CallOpServerSendStatus {
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpRecvInitialMetadata {
@ -116,7 +166,7 @@ class CallOpRecvInitialMetadata {
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpClientRecvStatus {
@ -125,12 +175,18 @@ class CallOpClientRecvStatus {
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpSetInterface : public CompletionQueueTag {
public:
CallOpSetInterface() : max_message_size_(0) {}
virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
void set_max_message_size(int max_message_size) { max_message_size_ = max_message_size; }
protected:
int max_message_size_;
};
template <class T, int I>
@ -145,27 +201,28 @@ public WrapAndDerive<Op4, 4>,
public WrapAndDerive<Op5, 5>,
public WrapAndDerive<Op6, 6> {
public:
CallOpSet() : return_tag_(this) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE {
this->Op1::AddOp(ops, nops);
this->Op2::AddOp(ops, nops);
this->Op3::AddOp(ops, nops);
this->Op4::AddOp(ops, nops);
this->Op5::AddOp(ops, nops);
this->Op6::AddOp(ops, nops);
this->WrapAndDerive<Op1, 1>::AddOp(ops, nops);
this->WrapAndDerive<Op2, 2>::AddOp(ops, nops);
this->WrapAndDerive<Op3, 3>::AddOp(ops, nops);
this->WrapAndDerive<Op4, 4>::AddOp(ops, nops);
this->WrapAndDerive<Op5, 5>::AddOp(ops, nops);
this->WrapAndDerive<Op6, 6>::AddOp(ops, nops);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
this->Op1::FinishOp(*tag, status);
this->Op2::FinishOp(*tag, status);
this->Op3::FinishOp(*tag, status);
this->Op4::FinishOp(*tag, status);
this->Op5::FinishOp(*tag, status);
this->Op6::FinishOp(*tag, status);
this->WrapAndDerive<Op1, 1>::FinishOp(*tag, status, max_message_size_);
this->WrapAndDerive<Op2, 2>::FinishOp(*tag, status, max_message_size_);
this->WrapAndDerive<Op3, 3>::FinishOp(*tag, status, max_message_size_);
this->WrapAndDerive<Op4, 4>::FinishOp(*tag, status, max_message_size_);
this->WrapAndDerive<Op5, 5>::FinishOp(*tag, status, max_message_size_);
this->WrapAndDerive<Op6, 6>::FinishOp(*tag, status, max_message_size_);
*tag = return_tag_;
return true;
}
void SetOutputTag(void* return_tag) { return_tag_ = return_tag; }
void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
private:
void *return_tag_;

@ -62,12 +62,12 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
CallOpClientSendClose,
CallOpClientRecvStatus> ops;
Status status;
ops.AddSendInitialMetadata(context);
ops.AddSendMessage(request);
ops.AddRecvInitialMetadata(context);
ops.AddRecvMessage(result);
ops.AddClientSendClose();
ops.AddClientRecvStatus(context, &status);
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendMessage(request);
ops.RecvInitialMetadata(context);
ops.RecvMessage(result);
ops.ClientSendClose();
ops.ClientRecvStatus(context, &status);
call.PerformOps(&ops);
GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.IsOk());
return status;

@ -34,7 +34,11 @@
#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
#include <grpc++/config.h>
#include <type_traits>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/config_protobuf.h>
#include <grpc++/status.h>
struct grpc_byte_buffer;
@ -47,8 +51,19 @@ bool SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer);
// The caller keeps ownership of buffer and msg.
bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size);
Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size);
template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of<grpc::protobuf::Message, T>::value>::type> {
public:
static bool Serialize(const grpc::protobuf::Message& msg, grpc_byte_buffer** buffer) {
return SerializeProto(msg, buffer);
}
static Status Deserialize(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, int max_message_size) {
return DeserializeProto(buffer, msg, max_message_size);
}
};
} // namespace grpc

@ -56,15 +56,15 @@ class MethodHandler {
virtual ~MethodHandler() {}
struct HandlerParameter {
HandlerParameter(Call* c, ServerContext* context,
const grpc::protobuf::Message* req,
grpc::protobuf::Message* resp)
: call(c), server_context(context), request(req), response(resp) {}
grpc_byte_buffer* req, int max_size)
: call(c), server_context(context), request(req), max_message_size(max_size) {}
Call* call;
ServerContext* server_context;
const grpc::protobuf::Message* request;
grpc::protobuf::Message* response;
// Handler required to grpc_byte_buffer_destroy this
grpc_byte_buffer* request;
int max_message_size;
};
virtual Status RunHandler(const HandlerParameter& param) = 0;
virtual void RunHandler(const HandlerParameter& param) = 0;
};
// A wrapper class of an application provided rpc method handler.
@ -77,11 +77,23 @@ class RpcMethodHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
// Invoke application function, cast proto messages to their actual types.
return func_(service_, param.server_context,
dynamic_cast<const RequestType*>(param.request),
dynamic_cast<ResponseType*>(param.response));
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
RequestType req;
Status status = SerializationTraits<RequestType>::Deserialize(param.request, &req, param.max_message_size);
ResponseType rsp;
if (status.IsOk()) {
status = func_(service_, param.server_context, &req, &rsp);
}
GPR_ASSERT(!param.server_context->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (status.IsOk()) {
ops.SendMessage(rsp);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
@ -102,10 +114,20 @@ class ClientStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReader<RequestType> reader(param.call, param.server_context);
return func_(service_, param.server_context, &reader,
dynamic_cast<ResponseType*>(param.response));
ResponseType rsp;
Status status = func_(service_, param.server_context, &reader, &rsp);
GPR_ASSERT(!param.server_context->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (status.IsOk()) {
ops.SendMessage(rsp);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
@ -124,10 +146,22 @@ class ServerStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerWriter<ResponseType> writer(param.call, param.server_context);
return func_(service_, param.server_context,
dynamic_cast<const RequestType*>(param.request), &writer);
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
RequestType req;
Status status = SerializationTraits<RequestType>::Deserialize(param.request, &req, param.max_message_size);
if (status.IsOk()) {
ServerWriter<ResponseType> writer(param.call, param.server_context);
status = func_(service_, param.server_context, &req, &writer);
}
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
@ -147,10 +181,18 @@ class BidiStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReaderWriter<ResponseType, RequestType> stream(param.call,
param.server_context);
return func_(service_, param.server_context, &stream);
Status status = func_(service_, param.server_context, &stream);
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
@ -162,29 +204,16 @@ class BidiStreamingHandler : public MethodHandler {
// Server side rpc method class
class RpcServiceMethod : public RpcMethod {
public:
// Takes ownership of the handler and two prototype objects.
// Takes ownership of the handler
RpcServiceMethod(const char* name, RpcMethod::RpcType type,
MethodHandler* handler,
grpc::protobuf::Message* request_prototype,
grpc::protobuf::Message* response_prototype)
MethodHandler* handler)
: RpcMethod(name, type, nullptr),
handler_(handler),
request_prototype_(request_prototype),
response_prototype_(response_prototype) {}
handler_(handler) {}
MethodHandler* handler() { return handler_.get(); }
grpc::protobuf::Message* AllocateRequestProto() {
return request_prototype_->New();
}
grpc::protobuf::Message* AllocateResponseProto() {
return response_prototype_->New();
}
private:
std::unique_ptr<MethodHandler> handler_;
std::unique_ptr<grpc::protobuf::Message> request_prototype_;
std::unique_ptr<grpc::protobuf::Message> response_prototype_;
};
// This class contains all the method information for an rpc service. It is

@ -38,12 +38,9 @@ struct grpc_byte_buffer;
namespace grpc {
template <class Message>
template <class Message, class UnusedButHereForPartialTemplateSpecialization = void>
class SerializationTraits;
typedef bool (*SerializationTraitsReadFunction)(grpc_byte_buffer* src, void* dest);
typedef bool (*SerializationTraitsWriteFunction)(const void* src, grpc_byte_buffer* dst);
} // namespace grpc
#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H

@ -41,13 +41,13 @@
#include <grpc++/config.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/impl/sync.h>
#include <grpc++/status.h>
struct grpc_server;
namespace grpc {
class AsynchronousService;
class GenericServerContext;
class AsyncGenericService;
@ -99,6 +99,44 @@ class Server GRPC_FINAL : public GrpcLibrary,
void PerformOpsOnCall(CallOpSetInterface *ops, Call* call) GRPC_OVERRIDE;
class BaseAsyncRequest : public CompletionQueueTag {
public:
BaseAsyncRequest(Server* server,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag);
private:
};
class RegisteredAsyncRequest : public BaseAsyncRequest {
public:
RegisteredAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: BaseAsyncRequest(server, stream, call_cq, notification_cq, tag) {}
};
class NoPayloadAsyncRequest : public RegisteredAsyncRequest {
public:
NoPayloadAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: RegisteredAsyncRequest(server, context, stream, call_cq, notification_cq, tag) {
}
};
template <class Message>
class PayloadAsyncRequest : public RegisteredAsyncRequest {
PayloadAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: RegisteredAsyncRequest(server, context, stream, call_cq, notification_cq, tag) {
}
};
class GenericAsyncRequest : public BaseAsyncRequest {
};
template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
@ -139,8 +177,6 @@ class Server GRPC_FINAL : public GrpcLibrary,
ThreadPoolInterface* thread_pool_;
// Whether the thread pool is created and owned by the server.
bool thread_pool_owned_;
private:
Server() : max_message_size_(-1), server_(NULL) { abort(); }
};
} // namespace grpc

@ -60,6 +60,14 @@ template <class W>
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class Call;
class CallOpBuffer;
@ -105,6 +113,14 @@ class ServerContext {
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
template <class ServiceType, class RequestType, class ResponseType>
friend class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler;
// Prevent copying.
ServerContext(const ServerContext&);

@ -161,8 +161,8 @@ class ClientWriter : public ClientWriterInterface<W> {
call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
CallOpSet<CallOpRecvMessage<R>> ops;
ops.AddSendInitialMetadata(&context->send_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
@ -413,7 +413,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
const RpcMethod& method, ClientContext* context,
const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_ops_.SetOutputTag(tag);
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendMessage(request);
init_ops_.ClientSendClose();
@ -423,13 +423,13 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_ops_.SetOutputTag(tag);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_ops_.SetOutputTag(tag);
read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
read_ops_.RecvInitialMetadata(context_);
}
@ -438,7 +438,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
finish_ops_.SetOutputTag(tag);
finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_ops_.RecvInitialMetadata(context_);
}
@ -473,7 +473,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
call_(channel->CreateCall(method, context, cq)) {
finish_ops_.RecvMessage(response);
init_ops_.SetOutputTag(tag);
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&init_ops_);
}
@ -481,25 +481,25 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_ops_.SetOutputTag(tag);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.SetOutputTag(tag);
write_ops_.set_output_tag(tag);
write_ops_.SendMessage(msg);
call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
writes_done_ops_.SetOutputTag(tag);
writes_done_ops_.set_output_tag(tag);
writes_done_ops_.ClientSendClose();
call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
finish_ops_.SetOutputTag(tag);
finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_ops_.RecvInitialMetadata(context_);
}
@ -534,7 +534,7 @@ class ClientAsyncReaderWriter GRPC_FINAL
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_ops_.SetOutputTag(tag);
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&init_ops_);
}
@ -542,34 +542,34 @@ class ClientAsyncReaderWriter GRPC_FINAL
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_ops_.SetOutputTag(tag);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_ops_.SetOutputTag(tag);
read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
read_ops_.RecvInitialMetadata(context_);
}
read_ops_.AddRecvMessage(msg);
read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.SetOutputTag(tag);
write_ops_.set_output_tag(tag);
write_ops_.SendMessage(msg);
call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
writes_done_ops_.SetOutputTag(tag);
writes_done_ops_.set_output_tag(tag);
writes_done_ops_.ClientSendClose();
call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
finish_ops_.SetOutputTag(tag);
finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
finish_ops_.RecvInitialMetadata(context_);
}
@ -598,20 +598,20 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.SetOutputTag(tag);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_ops_.SetOutputTag(tag);
read_ops_.set_output_tag(tag);
read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Finish(const W& msg, const Status& status, void* tag) {
finish_ops_.SetOutputTag(tag);
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@ -626,7 +626,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
finish_ops_.SetOutputTag(tag);
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@ -655,14 +655,14 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.SetOutputTag(tag);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.SetOutputTag(tag);
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@ -672,7 +672,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
}
void Finish(const Status& status, void* tag) {
finish_ops_.SetOutputTag(tag);
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@ -703,20 +703,20 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.SetOutputTag(tag);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_ops_.SetOutputTag(tag);
read_ops_.AddRecvMessage(msg);
read_ops_.set_output_tag(tag);
read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.SetOutputTag(tag);
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
@ -726,7 +726,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
}
void Finish(const Status& status, void* tag) {
finish_ops_.SetOutputTag(tag);
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;

@ -113,6 +113,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
grpc::string temp =
"#include <grpc++/impl/internal_stub.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
"#include <grpc++/impl/proto_utils.h>\n"
"#include <grpc++/impl/service_type.h>\n"
"#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/status.h>\n"
@ -1045,8 +1046,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, "
"$Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
@ -1055,8 +1055,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
@ -1065,8 +1064,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
@ -1075,8 +1073,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
}
printer->Print("return service_;\n");

@ -41,7 +41,6 @@
#include <grpc/support/slice.h>
#include "src/core/profiling/timers.h"
#include "src/cpp/proto/proto_utils.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@ -75,14 +74,14 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
return Call(c_call, this, cq);
}
void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = MAX_OPS;
grpc_op ops[MAX_OPS];
size_t nops = 0;
grpc_op cops[MAX_OPS];
GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
buf->FillOps(ops, &nops);
ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), ops, nops, buf));
grpc_call_start_batch(call->call(), cops, nops, ops));
GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
}

@ -44,7 +44,7 @@ struct grpc_channel;
namespace grpc {
class Call;
class CallOpBuffer;
class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class Credentials;
@ -59,7 +59,7 @@ class Channel GRPC_FINAL : public GrpcLibrary,
virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE;
virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
private:
const grpc::string target_;

@ -39,10 +39,10 @@
#include <grpc++/channel_interface.h>
#include "src/core/profiling/timers.h"
#include "src/cpp/proto/proto_utils.h"
namespace grpc {
#if 0
CallOpBuffer::CallOpBuffer()
: return_tag_(this),
send_initial_metadata_(false),
@ -338,6 +338,7 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
}
return true;
}
#endif
Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
: call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {}
@ -349,11 +350,11 @@ Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
call_(call),
max_message_size_(max_message_size) {}
void Call::PerformOps(CallOpBuffer* buffer) {
void Call::PerformOps(CallOpSetInterface* ops) {
if (max_message_size_ > 0) {
buffer->set_max_message_size(max_message_size_);
ops->set_max_message_size(max_message_size_);
}
call_hook_->PerformOpsOnCall(buffer, this);
call_hook_->PerformOpsOnCall(ops, this);
}
} // namespace grpc

@ -31,7 +31,7 @@
*
*/
#include "src/cpp/proto/proto_utils.h"
#include <grpc++/impl/proto_utils.h>
#include <grpc++/config.h>
#include <grpc/grpc.h>
@ -157,15 +157,23 @@ bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
return msg.SerializeToZeroCopyStream(&writer);
}
bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size) {
if (!buffer) return false;
if (!buffer) {
return Status(INVALID_ARGUMENT, "No payload");
}
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage();
if (!msg->ParseFromCodedStream(&decoder)) {
return Status(INVALID_ARGUMENT, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
return Status(INVALID_ARGUMENT, "Did not read entire message");
}
return Status::OK;
}
} // namespace grpc

@ -48,7 +48,6 @@
#include <grpc++/time.h>
#include "src/core/profiling/timers.h"
#include "src/cpp/proto/proto_utils.h"
namespace grpc {
@ -68,10 +67,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
in_flight_(false),
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::SERVER_STREAMING),
has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::CLIENT_STREAMING) {
RpcMethod::SERVER_STREAMING) {
grpc_metadata_array_init(&request_metadata_);
}
@ -116,7 +112,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
has_response_payload_(mrd->has_response_payload_),
request_payload_(mrd->request_payload_),
method_(mrd->method_) {
ctx_.call_ = mrd->call_;
@ -133,35 +128,9 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
void Run() {
std::unique_ptr<grpc::protobuf::Message> req;
std::unique_ptr<grpc::protobuf::Message> res;
if (has_request_payload_) {
GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
req.reset(method_->AllocateRequestProto());
if (!DeserializeProto(request_payload_, req.get(),
call_.max_message_size())) {
// FIXME(yangg) deal with deserialization failure
cq_.Shutdown();
return;
}
GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
}
if (has_response_payload_) {
res.reset(method_->AllocateResponseProto());
}
ctx_.BeginCompletionOp(&call_);
auto status = method_->handler()->RunHandler(
MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
CallOpBuffer buf;
if (!ctx_.sent_initial_metadata_) {
buf.AddSendInitialMetadata(&ctx_.initial_metadata_);
}
if (has_response_payload_) {
buf.AddSendMessage(*res);
}
buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
call_.PerformOps(&buf);
cq_.Pluck(&buf); /* status ignored */
method_->handler()->RunHandler(
MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_, call_.max_message_size()));
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
@ -173,7 +142,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
Call call_;
ServerContext ctx_;
const bool has_request_payload_;
const bool has_response_payload_;
grpc_byte_buffer* request_payload_;
RpcServiceMethod* const method_;
};
@ -183,7 +151,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
const bool has_response_payload_;
grpc_call* call_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
@ -251,9 +218,9 @@ bool Server::RegisterService(RpcService* service) {
}
bool Server::RegisterAsyncService(AsynchronousService* service) {
GPR_ASSERT(service->dispatch_impl_ == nullptr &&
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->dispatch_impl_ = this;
service->server_ = this;
service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag = grpc_server_register_method(server_, service->method_names_[i],
@ -318,15 +285,16 @@ void Server::Wait() {
}
}
void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = MAX_OPS;
grpc_op ops[MAX_OPS];
buf->FillOps(ops, &nops);
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), ops, nops, buf));
grpc_call_start_batch(call->call(), cops, nops, ops));
}
#if 0
class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
@ -352,9 +320,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
notification_cq->cq(), this);
}
AsyncRequest(Server* server, GenericServerContext* ctx,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
AsyncRequest()
: tag_(tag),
request_(nullptr),
stream_(stream),
@ -454,6 +420,7 @@ void Server::RequestAsyncGenericCall(GenericServerContext* context,
void* tag) {
new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
}
#endif
void Server::ScheduleCallback() {
{

@ -43,12 +43,12 @@ namespace grpc {
// CompletionOp
class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
CompletionOp() : refs_(2), finalized_(false), cancelled_(false) {
AddServerRecvClose(&cancelled_);
}
CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool CheckCancelled(CompletionQueue* cq);
@ -59,7 +59,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
grpc::mutex mu_;
int refs_;
bool finalized_;
bool cancelled_;
int cancelled_;
};
void ServerContext::CompletionOp::Unref() {
@ -73,14 +73,19 @@ void ServerContext::CompletionOp::Unref() {
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
grpc::lock_guard<grpc::mutex> g(mu_);
return finalized_ ? cancelled_ : false;
return finalized_ ? cancelled_ != 0 : false;
}
void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops->data.recv_close_on_server.cancelled = &cancelled_;
*nops = 1;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
if (!*status) cancelled_ = true;
if (!*status) cancelled_ = 1;
if (--refs_ == 0) {
lock.unlock();
delete this;

@ -760,7 +760,7 @@ WARN_LOGFILE =
# spaces.
# Note: If this tag is empty the current directory is searched.
INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h src/cpp/client/secure_credentials.h src/cpp/server/secure_server_credentials.h src/cpp/client/channel.h src/cpp/proto/proto_utils.h src/cpp/server/thread_pool.h src/cpp/client/secure_credentials.cc src/cpp/server/secure_server_credentials.cc src/cpp/client/channel.cc src/cpp/client/channel_arguments.cc src/cpp/client/client_context.cc src/cpp/client/client_unary_call.cc src/cpp/client/create_channel.cc src/cpp/client/credentials.cc src/cpp/client/generic_stub.cc src/cpp/client/insecure_credentials.cc src/cpp/client/internal_stub.cc src/cpp/common/call.cc src/cpp/common/completion_queue.cc src/cpp/common/rpc_method.cc src/cpp/proto/proto_utils.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/insecure_server_credentials.cc src/cpp/server/server.cc src/cpp/server/server_builder.cc src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/thread_pool.cc src/cpp/util/byte_buffer.cc src/cpp/util/slice.cc src/cpp/util/status.cc src/cpp/util/time.cc
INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h src/cpp/client/secure_credentials.h src/cpp/server/secure_server_credentials.h src/cpp/client/channel.h src/cpp/proto/proto_utils.h src/cpp/server/thread_pool.h src/cpp/client/secure_credentials.cc src/cpp/server/secure_server_credentials.cc src/cpp/client/channel.cc src/cpp/client/channel_arguments.cc src/cpp/client/client_context.cc src/cpp/client/create_channel.cc src/cpp/client/credentials.cc src/cpp/client/generic_stub.cc src/cpp/client/insecure_credentials.cc src/cpp/client/internal_stub.cc src/cpp/common/call.cc src/cpp/common/completion_queue.cc src/cpp/common/rpc_method.cc src/cpp/proto/proto_utils.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/insecure_server_credentials.cc src/cpp/server/server.cc src/cpp/server/server_builder.cc src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/thread_pool.cc src/cpp/util/byte_buffer.cc src/cpp/util/slice.cc src/cpp/util/status.cc src/cpp/util/time.cc
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

@ -199,8 +199,6 @@
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\credentials.cc">

@ -16,9 +16,6 @@
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>

@ -193,8 +193,6 @@
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\credentials.cc">

@ -10,9 +10,6 @@
<ClCompile Include="..\..\src\cpp\client\client_context.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\client_unary_call.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>
<ClCompile Include="..\..\src\cpp\client\create_channel.cc">
<Filter>src\cpp\client</Filter>
</ClCompile>

Loading…
Cancel
Save