diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h index 786f8c71846..a296102c3a0 100644 --- a/include/grpc++/async_unary_call.h +++ b/include/grpc++/async_unary_call.h @@ -58,40 +58,41 @@ template class ClientAsyncResponseReader GRPC_FINAL : public ClientAsyncResponseReaderInterface { public: + template ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request) + const W& request) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - init_buf_.AddSendMessage(request); - init_buf_.AddClientSendClose(); + init_buf_.SendInitialMetadata(context->send_initial_metadata_); + init_buf_.SendMessage(request); + init_buf_.ClientSendClose(); call_.PerformOps(&init_buf_); } void ReadInitialMetadata(void* tag) { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); + meta_buf_.SetOutputTag(tag); + meta_buf_.RecvInitialMetadata(context_); call_.PerformOps(&meta_buf_); } void Finish(R* msg, Status* status, void* tag) { - finish_buf_.Reset(tag); + finish_buf_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { finish_buf_.AddRecvInitialMetadata(context_); } - finish_buf_.AddRecvMessage(msg); - finish_buf_.AddClientRecvStatus(context_, status); + finish_buf_.RecvMessage(msg); + finish_buf_.ClientRecvStatus(context_, status); call_.PerformOps(&finish_buf_); } private: ClientContext* context_; Call call_; - SneakyCallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer finish_buf_; + SneakyCallOpSet init_buf_; + CallOpSet meta_buf_; + CallOpSet, CallOpClientRecvStatus> finish_buf_; }; template @@ -104,34 +105,34 @@ class ServerAsyncResponseWriter GRPC_FINAL void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_buf_.SetOutputTag(tag); + meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_buf_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_buf_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.IsOk()) { - finish_buf_.AddSendMessage(msg); + finish_buf_.SendMessage(msg); } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.IsOk()); - finish_buf_.Reset(tag); + finish_buf_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } @@ -140,8 +141,8 @@ class ServerAsyncResponseWriter GRPC_FINAL Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer finish_buf_; + CallOpSet meta_buf_; + CallOpSet finish_buf_; }; } // namespace grpc diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index e8429c8f417..b45b5e2e71e 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -35,7 +35,6 @@ #define GRPCXX_COMPLETION_QUEUE_H #include -#include #include #include @@ -56,7 +55,10 @@ class ServerWriter; template class ServerReaderWriter; +class ChannelInterface; +class ClientContext; class CompletionQueue; +class RpcMethod; class Server; class ServerBuilder; class ServerContext; @@ -120,11 +122,12 @@ class CompletionQueue : public GrpcLibrary { friend class ::grpc::ServerReaderWriter; friend class ::grpc::Server; friend class ::grpc::ServerContext; + template friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result); + const InputMessage& request, + OutputMessage* result); NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); diff --git a/include/grpc++/config.h b/include/grpc++/config.h index 55b2a644822..af248b66e37 100644 --- a/include/grpc++/config.h +++ b/include/grpc++/config.h @@ -77,33 +77,6 @@ #define GRPC_OVERRIDE override #endif -#ifndef GRPC_CUSTOM_PROTOBUF_INT64 -#include -#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 -#endif - -#ifndef GRPC_CUSTOM_MESSAGE -#include -#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message -#endif - -#ifndef GRPC_CUSTOM_STRING -#include -#define GRPC_CUSTOM_STRING std::string -#endif - -#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM -#include -#include -#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \ - ::google::protobuf::io::ZeroCopyOutputStream -#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \ - ::google::protobuf::io::ZeroCopyInputStream -#define GRPC_CUSTOM_CODEDINPUTSTREAM \ - ::google::protobuf::io::CodedInputStream -#endif - - #ifdef GRPC_CXX0X_NO_NULLPTR #include const class { @@ -121,23 +94,15 @@ private: } nullptr = {}; #endif +#ifndef GRPC_CUSTOM_STRING +#include +#define GRPC_CUSTOM_STRING std::string +#endif + namespace grpc { typedef GRPC_CUSTOM_STRING string; -namespace protobuf { - -typedef GRPC_CUSTOM_MESSAGE Message; -typedef GRPC_CUSTOM_PROTOBUF_INT64 int64; - -namespace io { -typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream; -typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream; -typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream; -} // namespace io - -} // namespace protobuf - } // namespace grpc #endif // GRPCXX_CONFIG_H diff --git a/include/grpc++/config_protobuf.h b/include/grpc++/config_protobuf.h new file mode 100644 index 00000000000..5ef1be1aa99 --- /dev/null +++ b/include/grpc++/config_protobuf.h @@ -0,0 +1,75 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_CONFIG_PROTOBUF_H +#define GRPCXX_CONFIG_PROTOBUF_H + +#include + +#ifndef GRPC_CUSTOM_PROTOBUF_INT64 +#include +#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 +#endif + +#ifndef GRPC_CUSTOM_MESSAGE +#include +#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message +#endif + +#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM +#include +#include +#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \ + ::google::protobuf::io::ZeroCopyOutputStream +#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \ + ::google::protobuf::io::ZeroCopyInputStream +#define GRPC_CUSTOM_CODEDINPUTSTREAM \ + ::google::protobuf::io::CodedInputStream +#endif + +namespace grpc { +namespace protobuf { + +typedef GRPC_CUSTOM_MESSAGE Message; +typedef GRPC_CUSTOM_PROTOBUF_INT64 int64; + +namespace io { +typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream; +typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream; +typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream; +} // namespace io + +} // namespace protobuf +} // namespace grpc + +#endif // GRPCXX_CONFIG_PROTOBUF_H diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index aae199db1b3..f8b290a851f 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -50,6 +50,128 @@ namespace grpc { class ByteBuffer; class Call; +class CallNoOp { + protected: + void AddOp(grpc_op* ops, size_t* nops) {} + void FinishOp(void* tag, bool* status) {} +}; + +class CallOpSendInitialMetadata { + public: + void SendInitialMetadata(const std::multimap& metadata); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpSendMessage { + public: + template + void SendMessage(const M& message); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +template +class CallOpRecvMessage { + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpGenericRecvMessage { + public: + template + void RecvMessage(R* message); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpClientSendClose { + public: + void ClientSendClose(); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpServerSendStatus { + public: + void ServerSendStatus(const std::multimap& trailing_metadata, const Status& status); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpRecvInitialMetadata { + public: + void RecvInitialMetadata(ClientContext* context); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpClientRecvStatus { + public: + void ClientRecvStatus(ClientContext* context, Status* status); + + protected: + void AddOp(grpc_op* ops, size_t* nops); + void FinishOp(void* tag, bool* status); +}; + +class CallOpSetInterface : public CompletionQueueTag { + public: + virtual void FillOps(grpc_op* ops, size_t* nops) = 0; +}; + +template +class WrapAndDerive : public T {}; + +template +class CallOpSet : public CallOpSetInterface, +public WrapAndDerive, +public WrapAndDerive, +public WrapAndDerive, +public WrapAndDerive, +public WrapAndDerive, +public WrapAndDerive { + public: + void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE { + this->Op1::AddOp(ops, nops); + this->Op2::AddOp(ops, nops); + this->Op3::AddOp(ops, nops); + this->Op4::AddOp(ops, nops); + this->Op5::AddOp(ops, nops); + this->Op6::AddOp(ops, nops); + } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + this->Op1::FinishOp(*tag, status); + this->Op2::FinishOp(*tag, status); + this->Op3::FinishOp(*tag, status); + this->Op4::FinishOp(*tag, status); + this->Op5::FinishOp(*tag, status); + this->Op6::FinishOp(*tag, status); + *tag = return_tag_; + return true; + } + + void SetOutputTag(void* return_tag) { return_tag_ = return_tag; } + + private: + void *return_tag_; +}; + +#if 0 class CallOpBuffer : public CompletionQueueTag { public: CallOpBuffer(); @@ -122,12 +244,14 @@ class CallOpBuffer : public CompletionQueueTag { int cancelled_buf_; bool* recv_closed_; }; +#endif // SneakyCallOpBuffer does not post completions to the completion queue -class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer { +template +class SneakyCallOpSet GRPC_FINAL : public CallOpSet { public: bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { - return CallOpBuffer::FinalizeResult(tag, status) && false; + return CallOpSet::FinalizeResult(tag, status) && false; } }; @@ -135,7 +259,7 @@ class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer { class CallHook { public: virtual ~CallHook() {} - virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0; + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; }; // Straightforward wrapping of the C call object @@ -146,7 +270,7 @@ class Call GRPC_FINAL { Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq, int max_message_size); - void PerformOps(CallOpBuffer* buffer); + void PerformOps(CallOpSetInterface* ops); grpc_call* call() { return call_; } CompletionQueue* cq() { return cq_; } diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index 0e8aeed7816..561c4721ef4 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -36,6 +36,8 @@ #include +#include + namespace grpc { class ChannelInterface; @@ -45,10 +47,31 @@ class RpcMethod; class Status; // Wrapper that performs a blocking unary call +template Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result); + const InputMessage& request, + OutputMessage* result) { + CompletionQueue cq; + Call call(channel->CreateCall(method, context, &cq)); + CallOpSet< + CallOpSendInitialMetadata, + CallOpSendMessage, + CallOpRecvInitialMetadata, + CallOpRecvMessage, + CallOpClientSendClose, + CallOpClientRecvStatus> ops; + Status status; + ops.AddSendInitialMetadata(context); + ops.AddSendMessage(request); + ops.AddRecvInitialMetadata(context); + ops.AddRecvMessage(result); + ops.AddClientSendClose(); + ops.AddClientRecvStatus(context, &status); + call.PerformOps(&ops); + GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.IsOk()); + return status; +} } // namespace grpc diff --git a/src/cpp/client/client_unary_call.cc b/include/grpc++/impl/serialization_traits.h similarity index 62% rename from src/cpp/client/client_unary_call.cc rename to include/grpc++/impl/serialization_traits.h index 7e7ea78bcde..d21ad92475d 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/include/grpc++/impl/serialization_traits.h @@ -31,34 +31,19 @@ * */ -#include -#include -#include -#include -#include -#include -#include +#ifndef GRPCXX_IMPL_SERIALIZATION_TRAITS_H +#define GRPCXX_IMPL_SERIALIZATION_TRAITS_H + +struct grpc_byte_buffer; namespace grpc { -// Wrapper that performs a blocking unary call -Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result) { - CompletionQueue cq; - Call call(channel->CreateCall(method, context, &cq)); - CallOpBuffer buf; - Status status; - buf.AddSendInitialMetadata(context); - buf.AddSendMessage(request); - buf.AddRecvInitialMetadata(context); - buf.AddRecvMessage(result); - buf.AddClientSendClose(); - buf.AddClientRecvStatus(context, &status); - call.PerformOps(&buf); - GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk()); - return status; -} +template +class SerializationTraits; + +typedef bool (*SerializationTraitsReadFunction)(grpc_byte_buffer* src, void* dest); +typedef bool (*SerializationTraitsWriteFunction)(const void* src, grpc_byte_buffer* dst); } // namespace grpc + +#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index bc39bb82ac3..af21d9b8cfa 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -35,6 +35,8 @@ #define GRPCXX_IMPL_SERVICE_TYPE_H #include +#include +#include namespace grpc { @@ -65,20 +67,8 @@ class ServerAsyncStreamingInterface { class AsynchronousService { public: - // this is Server, but in disguise to avoid a link dependency - class DispatchImpl { - public: - virtual void RequestAsyncCall(void* registered_method, - ServerContext* context, - ::grpc::protobuf::Message* request, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) = 0; - }; - AsynchronousService(const char** method_names, size_t method_count) - : dispatch_impl_(nullptr), + : server_(nullptr), method_names_(method_names), method_count_(method_count), request_args_(nullptr) {} @@ -86,42 +76,44 @@ class AsynchronousService { ~AsynchronousService() { delete[] request_args_; } protected: + template void RequestAsyncUnary(int index, ServerContext* context, - grpc::protobuf::Message* request, + Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, + stream, call_cq, notification_cq, tag, request); } void RequestClientStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, notification_cq, tag); } + template void RequestServerStreaming(int index, ServerContext* context, - grpc::protobuf::Message* request, + Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, + stream, call_cq, notification_cq, tag, request); } void RequestBidiStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, notification_cq, tag); } private: friend class Server; - DispatchImpl* dispatch_impl_; + Server* server_; const char** const method_names_; size_t method_count_; void** request_args_; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 2cfeb359fc0..e0599ee7682 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -53,13 +53,13 @@ class GenericServerContext; class AsyncGenericService; class RpcService; class RpcServiceMethod; +class ServerAsyncStreamingInterface; class ServerCredentials; class ThreadPoolInterface; // Currently it only supports handling rpcs in a single thread. class Server GRPC_FINAL : public GrpcLibrary, - private CallHook, - private AsynchronousService::DispatchImpl { + private CallHook { public: ~Server(); @@ -73,6 +73,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private: friend class AsyncGenericService; + friend class AsynchronousService; friend class ServerBuilder; class SyncRequest; @@ -96,15 +97,20 @@ class Server GRPC_FINAL : public GrpcLibrary, void RunRpc(); void ScheduleCallback(); - void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE; + void PerformOpsOnCall(CallOpSetInterface *ops, Call* call) GRPC_OVERRIDE; - // DispatchImpl + template void RequestAsyncCall(void* registered_method, ServerContext* context, - grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, - void* tag) GRPC_OVERRIDE; + void* tag, Message *message); + + void RequestAsyncCall(void* registered_method, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag); void RequestAsyncGenericCall(GenericServerContext* context, ServerAsyncStreamingInterface* stream, diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index c836f98c2a4..32ba03f8d8d 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -93,15 +93,16 @@ template class ClientReader GRPC_FINAL : public ClientReaderInterface { public: // Blocking create a stream and write the first request out. + template ClientReader(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const grpc::protobuf::Message& request) + ClientContext* context, const W& request) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&context->send_initial_metadata_); - buf.AddSendMessage(request); - buf.AddClientSendClose(); - call_.PerformOps(&buf); - cq_.Pluck(&buf); + CallOpSet ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + ops.SendMessage(request); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); } // Blocking wait for initial metadata from server. The received metadata @@ -111,28 +112,28 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { void WaitForInitialMetadata() { GPR_ASSERT(!context_->initial_metadata_received_); - CallOpBuffer buf; - buf.AddRecvInitialMetadata(context_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); // status ignored + CallOpSet ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet> ops; if (!context_->initial_metadata_received_) { - buf.AddRecvInitialMetadata(context_); + ops.RecvInitialMetadata(context_); } - buf.AddRecvMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf) && buf.got_message; + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; } Status Finish() GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet ops; Status status; - buf.AddClientRecvStatus(context_, &status); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); return status; } @@ -150,48 +151,48 @@ class ClientWriterInterface : public ClientStreamingInterface, }; template -class ClientWriter GRPC_FINAL : public ClientWriterInterface { +class ClientWriter : public ClientWriterInterface { public: // Blocking create a stream. + template ClientWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, grpc::protobuf::Message* response) + ClientContext* context, R* response) : context_(context), - response_(response), call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); + finish_ops_.RecvMessage(response); + + CallOpSet> ops; + ops.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddSendMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet ops; + ops.SendMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddClientSendClose(); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } // Read the final response and wait for the final status. Status Finish() GRPC_OVERRIDE { - CallOpBuffer buf; Status status; - buf.AddRecvMessage(response_); - buf.AddClientRecvStatus(context_, &status); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + finish_ops_.ClientRecvStatus(context_, &status); + call_.PerformOps(&finish_ops_); + GPR_ASSERT(cq_.Pluck(&finish_ops_)); return status; } private: ClientContext* context_; - grpc::protobuf::Message* const response_; + CallOpSet finish_ops_; CompletionQueue cq_; Call call_; }; @@ -213,10 +214,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); + CallOpSet ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); } // Blocking wait for initial metadata from server. The received metadata @@ -226,42 +227,42 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { void WaitForInitialMetadata() { GPR_ASSERT(!context_->initial_metadata_received_); - CallOpBuffer buf; - buf.AddRecvInitialMetadata(context_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); // status ignored + CallOpSet ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet> ops; if (!context_->initial_metadata_received_) { - buf.AddRecvInitialMetadata(context_); + ops.RecvInitialMetadata(context_); } - buf.AddRecvMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf) && buf.got_message; + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddSendMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet ops; + ops.SendMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddClientSendClose(); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } Status Finish() GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet ops; Status status; - buf.AddClientRecvStatus(context_, &status); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); return status; } @@ -279,18 +280,18 @@ class ServerReader GRPC_FINAL : public ReaderInterface { void SendInitialMetadata() { GPR_ASSERT(!ctx_->sent_initial_metadata_); - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + CallOpSet ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddRecvMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf) && buf.got_message; + CallOpSet> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; } private: @@ -306,22 +307,22 @@ class ServerWriter GRPC_FINAL : public WriterInterface { void SendInitialMetadata() { GPR_ASSERT(!ctx_->sent_initial_metadata_); - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + CallOpSet ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet ops; if (!ctx_->sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - buf.AddSendMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + ops.SendMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); } private: @@ -339,29 +340,29 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface, void SendInitialMetadata() { GPR_ASSERT(!ctx_->sent_initial_metadata_); - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + CallOpSet ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddRecvMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf) && buf.got_message; + CallOpSet> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet ops; if (!ctx_->sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - buf.AddSendMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + ops.SendMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); } private: @@ -407,50 +408,51 @@ template class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface { public: // Create a stream and write the first request out. + template ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request, void* tag) + const W& request, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.Reset(tag); - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - init_buf_.AddSendMessage(request); - init_buf_.AddClientSendClose(); - call_.PerformOps(&init_buf_); + init_ops_.SetOutputTag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + init_ops_.SendMessage(request); + init_ops_.ClientSendClose(); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + meta_ops_.SetOutputTag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); + read_ops_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { - read_buf_.AddRecvInitialMetadata(context_); + read_ops_.RecvInitialMetadata(context_); } - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - finish_buf_.AddClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - CallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer finish_buf_; + CallOpSet init_ops_; + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet finish_ops_; }; template @@ -463,56 +465,56 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, template class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface { public: + template ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, - grpc::protobuf::Message* response, void* tag) + R* response, void* tag) : context_(context), - response_(response), call_(channel->CreateCall(method, context, cq)) { - init_buf_.Reset(tag); - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&init_buf_); + finish_ops_.RecvMessage(response); + + init_ops_.SetOutputTag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + meta_ops_.SetOutputTag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + write_ops_.SetOutputTag(tag); + write_ops_.SendMessage(msg); + call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_buf_.Reset(tag); - writes_done_buf_.AddClientSendClose(); - call_.PerformOps(&writes_done_buf_); + writes_done_ops_.SetOutputTag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - finish_buf_.AddRecvMessage(response_); - finish_buf_.AddClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; - grpc::protobuf::Message* const response_; Call call_; - CallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer write_buf_; - CallOpBuffer writes_done_buf_; - CallOpBuffer finish_buf_; + CallOpSet init_ops_; + CallOpSet meta_ops_; + CallOpSet write_ops_; + CallOpSet writes_done_ops_; + CallOpSet finish_ops_; }; // Client-side interface for bi-directional streaming. @@ -532,58 +534,58 @@ class ClientAsyncReaderWriter GRPC_FINAL const RpcMethod& method, ClientContext* context, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.Reset(tag); - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&init_buf_); + init_ops_.SetOutputTag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + meta_ops_.SetOutputTag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); + read_ops_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { - read_buf_.AddRecvInitialMetadata(context_); + read_ops_.RecvInitialMetadata(context_); } - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.AddRecvMessage(msg); + call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + write_ops_.SetOutputTag(tag); + write_ops_.SendMessage(msg); + call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_buf_.Reset(tag); - writes_done_buf_.AddClientSendClose(); - call_.PerformOps(&writes_done_buf_); + writes_done_ops_.SetOutputTag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - finish_buf_.AddClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - CallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer write_buf_; - CallOpBuffer writes_done_buf_; - CallOpBuffer finish_buf_; + CallOpSet init_ops_; + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet write_ops_; + CallOpSet writes_done_ops_; + CallOpSet finish_ops_; }; template @@ -596,41 +598,41 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_ops_.SetOutputTag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.SetOutputTag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.IsOk()) { - finish_buf_.AddSendMessage(msg); + finish_ops_.SendMessage(msg); } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.IsOk()); - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -638,9 +640,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer finish_buf_; + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet finish_ops_; }; template @@ -653,30 +655,30 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_ops_.SetOutputTag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); + write_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + write_ops_.SendMessage(msg); + call_.PerformOps(&write_ops_); } void Finish(const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -684,9 +686,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer write_buf_; - CallOpBuffer finish_buf_; + CallOpSet meta_ops_; + CallOpSet write_ops_; + CallOpSet finish_ops_; }; // Server-side interface for bi-directional streaming. @@ -701,36 +703,36 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_ops_.SetOutputTag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.SetOutputTag(tag); + read_ops_.AddRecvMessage(msg); + call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); + write_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + write_ops_.SendMessage(msg); + call_.PerformOps(&write_ops_); } void Finish(const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_ops_.SetOutputTag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -738,10 +740,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer write_buf_; - CallOpBuffer finish_buf_; + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet write_ops_; + CallOpSet finish_ops_; }; } // namespace grpc diff --git a/src/compiler/config.h b/src/compiler/config.h index e81de8d6c8f..06ccd8530c8 100644 --- a/src/compiler/config.h +++ b/src/compiler/config.h @@ -35,6 +35,7 @@ #define SRC_COMPILER_CONFIG_H #include +#include #ifndef GRPC_CUSTOM_DESCRIPTOR #include