diff --git a/BUILD b/BUILD index bc8fa55a378..adad9597b07 100644 --- a/BUILD +++ b/BUILD @@ -576,7 +576,6 @@ cc_library( "src/cpp/client/channel.cc", "src/cpp/client/channel_arguments.cc", "src/cpp/client/client_context.cc", - "src/cpp/client/client_unary_call.cc", "src/cpp/client/create_channel.cc", "src/cpp/client/credentials.cc", "src/cpp/client/generic_stub.cc", @@ -656,7 +655,6 @@ cc_library( "src/cpp/client/channel.cc", "src/cpp/client/channel_arguments.cc", "src/cpp/client/client_context.cc", - "src/cpp/client/client_unary_call.cc", "src/cpp/client/create_channel.cc", "src/cpp/client/credentials.cc", "src/cpp/client/generic_stub.cc", diff --git a/Makefile b/Makefile index 1647daaa813..cfea847ab68 100644 --- a/Makefile +++ b/Makefile @@ -3325,7 +3325,6 @@ LIBGRPC++_SRC = \ src/cpp/client/channel.cc \ src/cpp/client/channel_arguments.cc \ src/cpp/client/client_context.cc \ - src/cpp/client/client_unary_call.cc \ src/cpp/client/create_channel.cc \ src/cpp/client/credentials.cc \ src/cpp/client/generic_stub.cc \ @@ -3613,7 +3612,6 @@ LIBGRPC++_UNSECURE_SRC = \ src/cpp/client/channel.cc \ src/cpp/client/channel_arguments.cc \ src/cpp/client/client_context.cc \ - src/cpp/client/client_unary_call.cc \ src/cpp/client/create_channel.cc \ src/cpp/client/credentials.cc \ src/cpp/client/generic_stub.cc \ diff --git a/build.json b/build.json index 0ff85a5c0c3..710475ca520 100644 --- a/build.json +++ b/build.json @@ -72,7 +72,6 @@ "src/cpp/client/channel.cc", "src/cpp/client/channel_arguments.cc", "src/cpp/client/client_context.cc", - "src/cpp/client/client_unary_call.cc", "src/cpp/client/create_channel.cc", "src/cpp/client/credentials.cc", "src/cpp/client/generic_stub.cc", diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h index a296102c3a0..d009ac6bbfe 100644 --- a/include/grpc++/async_unary_call.h +++ b/include/grpc++/async_unary_call.h @@ -51,7 +51,6 @@ class ClientAsyncResponseReaderInterface { virtual ~ClientAsyncResponseReaderInterface() {} virtual void ReadInitialMetadata(void* tag) = 0; virtual void Finish(R* msg, Status* status, void* tag) = 0; - }; template @@ -72,15 +71,15 @@ class ClientAsyncResponseReader GRPC_FINAL void ReadInitialMetadata(void* tag) { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.SetOutputTag(tag); + meta_buf_.set_output_tag(tag); meta_buf_.RecvInitialMetadata(context_); call_.PerformOps(&meta_buf_); } void Finish(R* msg, Status* status, void* tag) { - finish_buf_.SetOutputTag(tag); + finish_buf_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_buf_.RecvInitialMetadata(context_); } finish_buf_.RecvMessage(msg); finish_buf_.ClientRecvStatus(context_, status); @@ -92,7 +91,7 @@ class ClientAsyncResponseReader GRPC_FINAL Call call_; SneakyCallOpSet init_buf_; CallOpSet meta_buf_; - CallOpSet, CallOpClientRecvStatus> finish_buf_; + CallOpSet, CallOpClientRecvStatus> finish_buf_; }; template @@ -105,14 +104,14 @@ class ServerAsyncResponseWriter GRPC_FINAL void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.SetOutputTag(tag); + meta_buf_.set_output_tag(tag); meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_buf_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.SetOutputTag(tag); + finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; @@ -127,7 +126,7 @@ class ServerAsyncResponseWriter GRPC_FINAL void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.IsOk()); - finish_buf_.SetOutputTag(tag); + finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; diff --git a/include/grpc++/byte_buffer.h b/include/grpc++/byte_buffer.h index ceb62622fdb..80dedda94c6 100644 --- a/include/grpc++/byte_buffer.h +++ b/include/grpc++/byte_buffer.h @@ -38,6 +38,8 @@ #include #include #include +#include +#include #include @@ -60,9 +62,6 @@ class ByteBuffer GRPC_FINAL { void Clear(); size_t Length(); - private: - friend class CallOpBuffer; - // takes ownership void set_buffer(grpc_byte_buffer* buf) { if (buffer_) { @@ -72,11 +71,23 @@ class ByteBuffer GRPC_FINAL { buffer_ = buf; } + private: + friend class CallOpBuffer; + grpc_byte_buffer* buffer() const { return buffer_; } grpc_byte_buffer* buffer_; }; +template <> +class SerializationTraits { + public: + static Status Deserialize(grpc_byte_buffer* byte_buffer, ByteBuffer* dest, int max_message_size) { + dest->set_buffer(byte_buffer); + return Status::OK; + } +}; + } // namespace grpc #endif // GRPCXX_BYTE_BUFFER_H diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 6d9015f278c..d444c9035f4 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -131,6 +131,12 @@ class ClientContext { friend class ::grpc::ClientAsyncReaderWriter; template friend class ::grpc::ClientAsyncResponseReader; + template + friend Status BlockingUnaryCall(ChannelInterface* channel, + const RpcMethod& method, + ClientContext* context, + const InputMessage& request, + OutputMessage* result); grpc_call* call() { return call_; } void set_call(grpc_call* call, diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index b45b5e2e71e..04900739377 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -54,6 +54,14 @@ template class ServerWriter; template class ServerReaderWriter; +template +class RpcMethodHandler; +template +class ClientStreamingHandler; +template +class ServerStreamingHandler; +template +class BidiStreamingHandler; class ChannelInterface; class ClientContext; @@ -62,6 +70,7 @@ class RpcMethod; class Server; class ServerBuilder; class ServerContext; +class Status; class CompletionQueueTag { public: @@ -120,6 +129,14 @@ class CompletionQueue : public GrpcLibrary { friend class ::grpc::ServerWriter; template friend class ::grpc::ServerReaderWriter; + template + friend class RpcMethodHandler; + template + friend class ClientStreamingHandler; + template + friend class ServerStreamingHandler; + template + friend class BidiStreamingHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; template diff --git a/include/grpc++/config_protobuf.h b/include/grpc++/config_protobuf.h index 5ef1be1aa99..f6938b02ce9 100644 --- a/include/grpc++/config_protobuf.h +++ b/include/grpc++/config_protobuf.h @@ -34,8 +34,6 @@ #ifndef GRPCXX_CONFIG_PROTOBUF_H #define GRPCXX_CONFIG_PROTOBUF_H -#include - #ifndef GRPC_CUSTOM_PROTOBUF_INT64 #include #define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index f8b290a851f..3701e403de0 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -53,7 +54,7 @@ class Call; class CallNoOp { protected: void AddOp(grpc_op* ops, size_t* nops) {} - void FinishOp(void* tag, bool* status) {} + void FinishOp(void* tag, bool* status, int max_message_size) {} }; class CallOpSendInitialMetadata { @@ -62,24 +63,71 @@ class CallOpSendInitialMetadata { protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpSendMessage { public: + CallOpSendMessage() : send_buf_(nullptr) {} + template - void SendMessage(const M& message); + bool SendMessage(const M& message) { + return SerializationTraits::Serialize(message, &send_buf_); + } protected: - void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void AddOp(grpc_op* ops, size_t* nops) { + if (send_buf_ == nullptr) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = send_buf_; + } + void FinishOp(void* tag, bool* status, int max_message_size) { + grpc_byte_buffer_destroy(send_buf_); + } + + private: + grpc_byte_buffer* send_buf_; }; -template +template class CallOpRecvMessage { + public: + CallOpRecvMessage() : got_message(false), message_(nullptr) {} + + void RecvMessage(R* message) { + message_ = message; + } + + bool got_message; + protected: - void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void AddOp(grpc_op* ops, size_t* nops) { + if (message_ == nullptr) return; + grpc_op *op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &recv_buf_; + } + + void FinishOp(void* tag, bool* status, int max_message_size) { + if (message_ == nullptr) return; + if (recv_buf_) { + if (*status) { + got_message = true; + *status = SerializationTraits::Deserialize(recv_buf_, message_, max_message_size).IsOk(); + } else { + got_message = false; + grpc_byte_buffer_destroy(recv_buf_); + } + } else { + got_message = false; + *status = false; + } + } + + private: + R* message_; + grpc_byte_buffer* recv_buf_; }; class CallOpGenericRecvMessage { @@ -87,9 +135,11 @@ class CallOpGenericRecvMessage { template void RecvMessage(R* message); + bool got_message; + protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpClientSendClose { @@ -98,7 +148,7 @@ class CallOpClientSendClose { protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpServerSendStatus { @@ -107,7 +157,7 @@ class CallOpServerSendStatus { protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpRecvInitialMetadata { @@ -116,7 +166,7 @@ class CallOpRecvInitialMetadata { protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpClientRecvStatus { @@ -125,12 +175,18 @@ class CallOpClientRecvStatus { protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpSetInterface : public CompletionQueueTag { public: + CallOpSetInterface() : max_message_size_(0) {} virtual void FillOps(grpc_op* ops, size_t* nops) = 0; + + void set_max_message_size(int max_message_size) { max_message_size_ = max_message_size; } + + protected: + int max_message_size_; }; template @@ -145,27 +201,28 @@ public WrapAndDerive, public WrapAndDerive, public WrapAndDerive { public: + CallOpSet() : return_tag_(this) {} void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE { - this->Op1::AddOp(ops, nops); - this->Op2::AddOp(ops, nops); - this->Op3::AddOp(ops, nops); - this->Op4::AddOp(ops, nops); - this->Op5::AddOp(ops, nops); - this->Op6::AddOp(ops, nops); + this->WrapAndDerive::AddOp(ops, nops); + this->WrapAndDerive::AddOp(ops, nops); + this->WrapAndDerive::AddOp(ops, nops); + this->WrapAndDerive::AddOp(ops, nops); + this->WrapAndDerive::AddOp(ops, nops); + this->WrapAndDerive::AddOp(ops, nops); } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { - this->Op1::FinishOp(*tag, status); - this->Op2::FinishOp(*tag, status); - this->Op3::FinishOp(*tag, status); - this->Op4::FinishOp(*tag, status); - this->Op5::FinishOp(*tag, status); - this->Op6::FinishOp(*tag, status); + this->WrapAndDerive::FinishOp(*tag, status, max_message_size_); + this->WrapAndDerive::FinishOp(*tag, status, max_message_size_); + this->WrapAndDerive::FinishOp(*tag, status, max_message_size_); + this->WrapAndDerive::FinishOp(*tag, status, max_message_size_); + this->WrapAndDerive::FinishOp(*tag, status, max_message_size_); + this->WrapAndDerive::FinishOp(*tag, status, max_message_size_); *tag = return_tag_; return true; } - void SetOutputTag(void* return_tag) { return_tag_ = return_tag; } + void set_output_tag(void* return_tag) { return_tag_ = return_tag; } private: void *return_tag_; diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index 561c4721ef4..8c42fb47927 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -62,12 +62,12 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, CallOpClientSendClose, CallOpClientRecvStatus> ops; Status status; - ops.AddSendInitialMetadata(context); - ops.AddSendMessage(request); - ops.AddRecvInitialMetadata(context); - ops.AddRecvMessage(result); - ops.AddClientSendClose(); - ops.AddClientRecvStatus(context, &status); + ops.SendInitialMetadata(context->send_initial_metadata_); + ops.SendMessage(request); + ops.RecvInitialMetadata(context); + ops.RecvMessage(result); + ops.ClientSendClose(); + ops.ClientRecvStatus(context, &status); call.PerformOps(&ops); GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.IsOk()); return status; diff --git a/src/cpp/proto/proto_utils.h b/include/grpc++/impl/proto_utils.h similarity index 75% rename from src/cpp/proto/proto_utils.h rename to include/grpc++/impl/proto_utils.h index 67a775b3ca5..1a0cc31a8ac 100644 --- a/src/cpp/proto/proto_utils.h +++ b/include/grpc++/impl/proto_utils.h @@ -34,7 +34,11 @@ #ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H #define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H -#include +#include + +#include +#include +#include struct grpc_byte_buffer; @@ -47,8 +51,19 @@ bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** buffer); // The caller keeps ownership of buffer and msg. -bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, - int max_message_size); +Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, + int max_message_size); + +template +class SerializationTraits::value>::type> { + public: + static bool Serialize(const grpc::protobuf::Message& msg, grpc_byte_buffer** buffer) { + return SerializeProto(msg, buffer); + } + static Status Deserialize(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, int max_message_size) { + return DeserializeProto(buffer, msg, max_message_size); + } +}; } // namespace grpc diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 50204d20999..05bba6ef7c3 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -56,15 +56,15 @@ class MethodHandler { virtual ~MethodHandler() {} struct HandlerParameter { HandlerParameter(Call* c, ServerContext* context, - const grpc::protobuf::Message* req, - grpc::protobuf::Message* resp) - : call(c), server_context(context), request(req), response(resp) {} + grpc_byte_buffer* req, int max_size) + : call(c), server_context(context), request(req), max_message_size(max_size) {} Call* call; ServerContext* server_context; - const grpc::protobuf::Message* request; - grpc::protobuf::Message* response; + // Handler required to grpc_byte_buffer_destroy this + grpc_byte_buffer* request; + int max_message_size; }; - virtual Status RunHandler(const HandlerParameter& param) = 0; + virtual void RunHandler(const HandlerParameter& param) = 0; }; // A wrapper class of an application provided rpc method handler. @@ -77,11 +77,23 @@ class RpcMethodHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { - // Invoke application function, cast proto messages to their actual types. - return func_(service_, param.server_context, - dynamic_cast(param.request), - dynamic_cast(param.response)); + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits::Deserialize(param.request, &req, param.max_message_size); + ResponseType rsp; + if (status.IsOk()) { + status = func_(service_, param.server_context, &req, &rsp); + } + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.IsOk()) { + ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -102,10 +114,20 @@ class ClientStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { + void RunHandler(const HandlerParameter& param) GRPC_FINAL { ServerReader reader(param.call, param.server_context); - return func_(service_, param.server_context, &reader, - dynamic_cast(param.response)); + ResponseType rsp; + Status status = func_(service_, param.server_context, &reader, &rsp); + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.IsOk()) { + ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -124,10 +146,22 @@ class ServerStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { - ServerWriter writer(param.call, param.server_context); - return func_(service_, param.server_context, - dynamic_cast(param.request), &writer); + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits::Deserialize(param.request, &req, param.max_message_size); + + if (status.IsOk()) { + ServerWriter writer(param.call, param.server_context); + status = func_(service_, param.server_context, &req, &writer); + } + + CallOpSet ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -147,10 +181,18 @@ class BidiStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { + void RunHandler(const HandlerParameter& param) GRPC_FINAL { ServerReaderWriter stream(param.call, param.server_context); - return func_(service_, param.server_context, &stream); + Status status = func_(service_, param.server_context, &stream); + + CallOpSet ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -162,29 +204,16 @@ class BidiStreamingHandler : public MethodHandler { // Server side rpc method class class RpcServiceMethod : public RpcMethod { public: - // Takes ownership of the handler and two prototype objects. + // Takes ownership of the handler RpcServiceMethod(const char* name, RpcMethod::RpcType type, - MethodHandler* handler, - grpc::protobuf::Message* request_prototype, - grpc::protobuf::Message* response_prototype) + MethodHandler* handler) : RpcMethod(name, type, nullptr), - handler_(handler), - request_prototype_(request_prototype), - response_prototype_(response_prototype) {} + handler_(handler) {} MethodHandler* handler() { return handler_.get(); } - grpc::protobuf::Message* AllocateRequestProto() { - return request_prototype_->New(); - } - grpc::protobuf::Message* AllocateResponseProto() { - return response_prototype_->New(); - } - private: std::unique_ptr handler_; - std::unique_ptr request_prototype_; - std::unique_ptr response_prototype_; }; // This class contains all the method information for an rpc service. It is diff --git a/include/grpc++/impl/serialization_traits.h b/include/grpc++/impl/serialization_traits.h index d21ad92475d..4648bbfc330 100644 --- a/include/grpc++/impl/serialization_traits.h +++ b/include/grpc++/impl/serialization_traits.h @@ -38,12 +38,9 @@ struct grpc_byte_buffer; namespace grpc { -template +template class SerializationTraits; -typedef bool (*SerializationTraitsReadFunction)(grpc_byte_buffer* src, void* dest); -typedef bool (*SerializationTraitsWriteFunction)(const void* src, grpc_byte_buffer* dst); - } // namespace grpc #endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H diff --git a/include/grpc++/server.h b/include/grpc++/server.h index e0599ee7682..4784bace1c6 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -41,13 +41,13 @@ #include #include #include -#include #include #include struct grpc_server; namespace grpc { + class AsynchronousService; class GenericServerContext; class AsyncGenericService; @@ -99,6 +99,44 @@ class Server GRPC_FINAL : public GrpcLibrary, void PerformOpsOnCall(CallOpSetInterface *ops, Call* call) GRPC_OVERRIDE; + class BaseAsyncRequest : public CompletionQueueTag { + public: + BaseAsyncRequest(Server* server, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag); + + private: + }; + + class RegisteredAsyncRequest : public BaseAsyncRequest { + public: + RegisteredAsyncRequest(Server* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) + : BaseAsyncRequest(server, stream, call_cq, notification_cq, tag) {} + }; + + class NoPayloadAsyncRequest : public RegisteredAsyncRequest { + public: + NoPayloadAsyncRequest(Server* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) + : RegisteredAsyncRequest(server, context, stream, call_cq, notification_cq, tag) { + } + }; + + template + class PayloadAsyncRequest : public RegisteredAsyncRequest { + PayloadAsyncRequest(Server* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) + : RegisteredAsyncRequest(server, context, stream, call_cq, notification_cq, tag) { + } + }; + + class GenericAsyncRequest : public BaseAsyncRequest { + }; + template void RequestAsyncCall(void* registered_method, ServerContext* context, ServerAsyncStreamingInterface* stream, @@ -139,8 +177,6 @@ class Server GRPC_FINAL : public GrpcLibrary, ThreadPoolInterface* thread_pool_; // Whether the thread pool is created and owned by the server. bool thread_pool_owned_; - private: - Server() : max_message_size_(-1), server_(NULL) { abort(); } }; } // namespace grpc diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index d88a3ae2627..326b6a125ce 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -60,6 +60,14 @@ template class ServerWriter; template class ServerReaderWriter; +template +class RpcMethodHandler; +template +class ClientStreamingHandler; +template +class ServerStreamingHandler; +template +class BidiStreamingHandler; class Call; class CallOpBuffer; @@ -105,6 +113,14 @@ class ServerContext { friend class ::grpc::ServerWriter; template friend class ::grpc::ServerReaderWriter; + template + friend class RpcMethodHandler; + template + friend class ClientStreamingHandler; + template + friend class ServerStreamingHandler; + template + friend class BidiStreamingHandler; // Prevent copying. ServerContext(const ServerContext&); diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 32ba03f8d8d..39a1cc85952 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -161,8 +161,8 @@ class ClientWriter : public ClientWriterInterface { call_(channel->CreateCall(method, context, &cq_)) { finish_ops_.RecvMessage(response); - CallOpSet> ops; - ops.AddSendInitialMetadata(&context->send_initial_metadata_); + CallOpSet ops; + ops.SendInitialMetadata(context->send_initial_metadata_); call_.PerformOps(&ops); cq_.Pluck(&ops); } @@ -413,7 +413,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface { const RpcMethod& method, ClientContext* context, const W& request, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.SetOutputTag(tag); + init_ops_.set_output_tag(tag); init_ops_.SendInitialMetadata(context->send_initial_metadata_); init_ops_.SendMessage(request); init_ops_.ClientSendClose(); @@ -423,13 +423,13 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface { void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.SetOutputTag(tag); + meta_ops_.set_output_tag(tag); meta_ops_.RecvInitialMetadata(context_); call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.SetOutputTag(tag); + read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { read_ops_.RecvInitialMetadata(context_); } @@ -438,7 +438,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface { } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.SetOutputTag(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { finish_ops_.RecvInitialMetadata(context_); } @@ -473,7 +473,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface { call_(channel->CreateCall(method, context, cq)) { finish_ops_.RecvMessage(response); - init_ops_.SetOutputTag(tag); + init_ops_.set_output_tag(tag); init_ops_.SendInitialMetadata(context->send_initial_metadata_); call_.PerformOps(&init_ops_); } @@ -481,25 +481,25 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface { void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.SetOutputTag(tag); + meta_ops_.set_output_tag(tag); meta_ops_.RecvInitialMetadata(context_); call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.SetOutputTag(tag); + write_ops_.set_output_tag(tag); write_ops_.SendMessage(msg); call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.SetOutputTag(tag); + writes_done_ops_.set_output_tag(tag); writes_done_ops_.ClientSendClose(); call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.SetOutputTag(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { finish_ops_.RecvInitialMetadata(context_); } @@ -534,7 +534,7 @@ class ClientAsyncReaderWriter GRPC_FINAL const RpcMethod& method, ClientContext* context, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.SetOutputTag(tag); + init_ops_.set_output_tag(tag); init_ops_.SendInitialMetadata(context->send_initial_metadata_); call_.PerformOps(&init_ops_); } @@ -542,34 +542,34 @@ class ClientAsyncReaderWriter GRPC_FINAL void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.SetOutputTag(tag); + meta_ops_.set_output_tag(tag); meta_ops_.RecvInitialMetadata(context_); call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.SetOutputTag(tag); + read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { read_ops_.RecvInitialMetadata(context_); } - read_ops_.AddRecvMessage(msg); + read_ops_.RecvMessage(msg); call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.SetOutputTag(tag); + write_ops_.set_output_tag(tag); write_ops_.SendMessage(msg); call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.SetOutputTag(tag); + writes_done_ops_.set_output_tag(tag); writes_done_ops_.ClientSendClose(); call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.SetOutputTag(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { finish_ops_.RecvInitialMetadata(context_); } @@ -598,20 +598,20 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.SetOutputTag(tag); + meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.SetOutputTag(tag); + read_ops_.set_output_tag(tag); read_ops_.RecvMessage(msg); call_.PerformOps(&read_ops_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_ops_.SetOutputTag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; @@ -626,7 +626,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.IsOk()); - finish_ops_.SetOutputTag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; @@ -655,14 +655,14 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.SetOutputTag(tag); + meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.SetOutputTag(tag); + write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; @@ -672,7 +672,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, } void Finish(const Status& status, void* tag) { - finish_ops_.SetOutputTag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; @@ -703,20 +703,20 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.SetOutputTag(tag); + meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.SetOutputTag(tag); - read_ops_.AddRecvMessage(msg); + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.SetOutputTag(tag); + write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; @@ -726,7 +726,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, } void Finish(const Status& status, void* tag) { - finish_ops_.SetOutputTag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index b0d2b5d2298..d03eaddf408 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -113,6 +113,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, grpc::string temp = "#include \n" "#include \n" + "#include \n" "#include \n" "#include \n" "#include \n" @@ -1045,8 +1046,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " "$Request$, " "$Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, @@ -1055,8 +1055,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::CLIENT_STREAMING,\n" " new ::grpc::ClientStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -1065,8 +1064,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::SERVER_STREAMING,\n" " new ::grpc::ServerStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (BidiStreaming(method)) { printer->Print( *vars, @@ -1075,8 +1073,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::BIDI_STREAMING,\n" " new ::grpc::BidiStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } } printer->Print("return service_;\n"); diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 475a20d8832..6e6278cb058 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -41,7 +41,6 @@ #include #include "src/core/profiling/timers.h" -#include "src/cpp/proto/proto_utils.h" #include #include #include @@ -75,14 +74,14 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, return Call(c_call, this, cq); } -void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) { +void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; - size_t nops = MAX_OPS; - grpc_op ops[MAX_OPS]; + size_t nops = 0; + grpc_op cops[MAX_OPS]; GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); - buf->FillOps(ops, &nops); + ops->FillOps(cops, &nops); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), ops, nops, buf)); + grpc_call_start_batch(call->call(), cops, nops, ops)); GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); } diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index cd239247c82..69baa419a65 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -44,7 +44,7 @@ struct grpc_channel; namespace grpc { class Call; -class CallOpBuffer; +class CallOpSetInterface; class ChannelArguments; class CompletionQueue; class Credentials; @@ -59,7 +59,7 @@ class Channel GRPC_FINAL : public GrpcLibrary, virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE; virtual Call CreateCall(const RpcMethod& method, ClientContext* context, CompletionQueue* cq) GRPC_OVERRIDE; - virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE; + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; private: const grpc::string target_; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 1068111e3f4..dc3d36faa2d 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -39,10 +39,10 @@ #include #include "src/core/profiling/timers.h" -#include "src/cpp/proto/proto_utils.h" namespace grpc { +#if 0 CallOpBuffer::CallOpBuffer() : return_tag_(this), send_initial_metadata_(false), @@ -338,6 +338,7 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { } return true; } +#endif Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) : call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {} @@ -349,11 +350,11 @@ Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, call_(call), max_message_size_(max_message_size) {} -void Call::PerformOps(CallOpBuffer* buffer) { +void Call::PerformOps(CallOpSetInterface* ops) { if (max_message_size_ > 0) { - buffer->set_max_message_size(max_message_size_); + ops->set_max_message_size(max_message_size_); } - call_hook_->PerformOpsOnCall(buffer, this); + call_hook_->PerformOpsOnCall(ops, this); } } // namespace grpc diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 7a7e73bba4d..a4a37c70006 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -31,7 +31,7 @@ * */ -#include "src/cpp/proto/proto_utils.h" +#include #include #include @@ -157,15 +157,23 @@ bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) { return msg.SerializeToZeroCopyStream(&writer); } -bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, +Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, int max_message_size) { - if (!buffer) return false; + if (!buffer) { + return Status(INVALID_ARGUMENT, "No payload"); + } GrpcBufferReader reader(buffer); ::grpc::protobuf::io::CodedInputStream decoder(&reader); if (max_message_size > 0) { decoder.SetTotalBytesLimit(max_message_size, max_message_size); } - return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage(); + if (!msg->ParseFromCodedStream(&decoder)) { + return Status(INVALID_ARGUMENT, msg->InitializationErrorString()); + } + if (!decoder.ConsumedEntireMessage()) { + return Status(INVALID_ARGUMENT, "Did not read entire message"); + } + return Status::OK; } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index dbd88c5b8c1..c08506c97ff 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -48,7 +48,6 @@ #include #include "src/core/profiling/timers.h" -#include "src/cpp/proto/proto_utils.h" namespace grpc { @@ -68,10 +67,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { in_flight_(false), has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || method->method_type() == - RpcMethod::SERVER_STREAMING), - has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC || - method->method_type() == - RpcMethod::CLIENT_STREAMING) { + RpcMethod::SERVER_STREAMING) { grpc_metadata_array_init(&request_metadata_); } @@ -116,7 +112,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { ctx_(mrd->deadline_, mrd->request_metadata_.metadata, mrd->request_metadata_.count), has_request_payload_(mrd->has_request_payload_), - has_response_payload_(mrd->has_response_payload_), request_payload_(mrd->request_payload_), method_(mrd->method_) { ctx_.call_ = mrd->call_; @@ -133,35 +128,9 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { } void Run() { - std::unique_ptr req; - std::unique_ptr res; - if (has_request_payload_) { - GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call()); - req.reset(method_->AllocateRequestProto()); - if (!DeserializeProto(request_payload_, req.get(), - call_.max_message_size())) { - // FIXME(yangg) deal with deserialization failure - cq_.Shutdown(); - return; - } - GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call()); - } - if (has_response_payload_) { - res.reset(method_->AllocateResponseProto()); - } ctx_.BeginCompletionOp(&call_); - auto status = method_->handler()->RunHandler( - MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); - CallOpBuffer buf; - if (!ctx_.sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_.initial_metadata_); - } - if (has_response_payload_) { - buf.AddSendMessage(*res); - } - buf.AddServerSendStatus(&ctx_.trailing_metadata_, status); - call_.PerformOps(&buf); - cq_.Pluck(&buf); /* status ignored */ + method_->handler()->RunHandler( + MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_, call_.max_message_size())); void* ignored_tag; bool ignored_ok; cq_.Shutdown(); @@ -173,7 +142,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { Call call_; ServerContext ctx_; const bool has_request_payload_; - const bool has_response_payload_; grpc_byte_buffer* request_payload_; RpcServiceMethod* const method_; }; @@ -183,7 +151,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { void* const tag_; bool in_flight_; const bool has_request_payload_; - const bool has_response_payload_; grpc_call* call_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; @@ -251,9 +218,9 @@ bool Server::RegisterService(RpcService* service) { } bool Server::RegisterAsyncService(AsynchronousService* service) { - GPR_ASSERT(service->dispatch_impl_ == nullptr && + GPR_ASSERT(service->server_ == nullptr && "Can only register an asynchronous service against one server."); - service->dispatch_impl_ = this; + service->server_ = this; service->request_args_ = new void*[service->method_count_]; for (size_t i = 0; i < service->method_count_; ++i) { void* tag = grpc_server_register_method(server_, service->method_names_[i], @@ -318,15 +285,16 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) { +void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; - size_t nops = MAX_OPS; - grpc_op ops[MAX_OPS]; - buf->FillOps(ops, &nops); + size_t nops = 0; + grpc_op cops[MAX_OPS]; + ops->FillOps(cops, &nops); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), ops, nops, buf)); + grpc_call_start_batch(call->call(), cops, nops, ops)); } +#if 0 class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { public: AsyncRequest(Server* server, void* registered_method, ServerContext* ctx, @@ -352,9 +320,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { notification_cq->cq(), this); } - AsyncRequest(Server* server, GenericServerContext* ctx, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) + AsyncRequest() : tag_(tag), request_(nullptr), stream_(stream), @@ -454,6 +420,7 @@ void Server::RequestAsyncGenericCall(GenericServerContext* context, void* tag) { new AsyncRequest(this, context, stream, call_cq, notification_cq, tag); } +#endif void Server::ScheduleCallback() { { diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 6b5e41d0a82..eea9645e37d 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -43,12 +43,12 @@ namespace grpc { // CompletionOp -class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer { +class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { public: // initial refs: one in the server context, one in the cq - CompletionOp() : refs_(2), finalized_(false), cancelled_(false) { - AddServerRecvClose(&cancelled_); - } + CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {} + + void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; bool CheckCancelled(CompletionQueue* cq); @@ -59,7 +59,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer { grpc::mutex mu_; int refs_; bool finalized_; - bool cancelled_; + int cancelled_; }; void ServerContext::CompletionOp::Unref() { @@ -73,14 +73,19 @@ void ServerContext::CompletionOp::Unref() { bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { cq->TryPluck(this); grpc::lock_guard g(mu_); - return finalized_ ? cancelled_ : false; + return finalized_ ? cancelled_ != 0 : false; +} + +void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { + ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops->data.recv_close_on_server.cancelled = &cancelled_; + *nops = 1; } bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { - GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status)); grpc::unique_lock lock(mu_); finalized_ = true; - if (!*status) cancelled_ = true; + if (!*status) cancelled_ = 1; if (--refs_ == 0) { lock.unlock(); delete this; diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index feac0244cc6..70bd0b68034 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -760,7 +760,7 @@ WARN_LOGFILE = # spaces. # Note: If this tag is empty the current directory is searched. -INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h src/cpp/client/secure_credentials.h src/cpp/server/secure_server_credentials.h src/cpp/client/channel.h src/cpp/proto/proto_utils.h src/cpp/server/thread_pool.h src/cpp/client/secure_credentials.cc src/cpp/server/secure_server_credentials.cc src/cpp/client/channel.cc src/cpp/client/channel_arguments.cc src/cpp/client/client_context.cc src/cpp/client/client_unary_call.cc src/cpp/client/create_channel.cc src/cpp/client/credentials.cc src/cpp/client/generic_stub.cc src/cpp/client/insecure_credentials.cc src/cpp/client/internal_stub.cc src/cpp/common/call.cc src/cpp/common/completion_queue.cc src/cpp/common/rpc_method.cc src/cpp/proto/proto_utils.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/insecure_server_credentials.cc src/cpp/server/server.cc src/cpp/server/server_builder.cc src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/thread_pool.cc src/cpp/util/byte_buffer.cc src/cpp/util/slice.cc src/cpp/util/status.cc src/cpp/util/time.cc +INPUT = include/grpc++/async_generic_service.h include/grpc++/async_unary_call.h include/grpc++/byte_buffer.h include/grpc++/channel_arguments.h include/grpc++/channel_interface.h include/grpc++/client_context.h include/grpc++/completion_queue.h include/grpc++/config.h include/grpc++/create_channel.h include/grpc++/credentials.h include/grpc++/generic_stub.h include/grpc++/impl/call.h include/grpc++/impl/client_unary_call.h include/grpc++/impl/grpc_library.h include/grpc++/impl/internal_stub.h include/grpc++/impl/rpc_method.h include/grpc++/impl/rpc_service_method.h include/grpc++/impl/service_type.h include/grpc++/impl/sync.h include/grpc++/impl/sync_cxx11.h include/grpc++/impl/sync_no_cxx11.h include/grpc++/impl/thd.h include/grpc++/impl/thd_cxx11.h include/grpc++/impl/thd_no_cxx11.h include/grpc++/server.h include/grpc++/server_builder.h include/grpc++/server_context.h include/grpc++/server_credentials.h include/grpc++/slice.h include/grpc++/status.h include/grpc++/status_code_enum.h include/grpc++/stream.h include/grpc++/thread_pool_interface.h include/grpc++/time.h src/cpp/client/secure_credentials.h src/cpp/server/secure_server_credentials.h src/cpp/client/channel.h src/cpp/proto/proto_utils.h src/cpp/server/thread_pool.h src/cpp/client/secure_credentials.cc src/cpp/server/secure_server_credentials.cc src/cpp/client/channel.cc src/cpp/client/channel_arguments.cc src/cpp/client/client_context.cc src/cpp/client/create_channel.cc src/cpp/client/credentials.cc src/cpp/client/generic_stub.cc src/cpp/client/insecure_credentials.cc src/cpp/client/internal_stub.cc src/cpp/common/call.cc src/cpp/common/completion_queue.cc src/cpp/common/rpc_method.cc src/cpp/proto/proto_utils.cc src/cpp/server/async_generic_service.cc src/cpp/server/create_default_thread_pool.cc src/cpp/server/insecure_server_credentials.cc src/cpp/server/server.cc src/cpp/server/server_builder.cc src/cpp/server/server_context.cc src/cpp/server/server_credentials.cc src/cpp/server/thread_pool.cc src/cpp/util/byte_buffer.cc src/cpp/util/slice.cc src/cpp/util/status.cc src/cpp/util/time.cc # This tag can be used to specify the character encoding of the source files # that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses diff --git a/vsprojects/grpc++/grpc++.vcxproj b/vsprojects/grpc++/grpc++.vcxproj index d233f9e3d31..ee39df237c4 100644 --- a/vsprojects/grpc++/grpc++.vcxproj +++ b/vsprojects/grpc++/grpc++.vcxproj @@ -199,8 +199,6 @@ - - diff --git a/vsprojects/grpc++/grpc++.vcxproj.filters b/vsprojects/grpc++/grpc++.vcxproj.filters index dd375c7238f..19c7c93325e 100644 --- a/vsprojects/grpc++/grpc++.vcxproj.filters +++ b/vsprojects/grpc++/grpc++.vcxproj.filters @@ -16,9 +16,6 @@ src\cpp\client - - src\cpp\client - src\cpp\client diff --git a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj index 9b2ef9137d5..61485170701 100644 --- a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj +++ b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj @@ -193,8 +193,6 @@ - - diff --git a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters index d616e336e47..6d37054df01 100644 --- a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters +++ b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters @@ -10,9 +10,6 @@ src\cpp\client - - src\cpp\client - src\cpp\client