Beginning the cleanup

pull/1969/head
Craig Tiller 10 years ago
parent da11694971
commit 81fafa8971
  1. 47
      include/grpc++/async_unary_call.h
  2. 9
      include/grpc++/completion_queue.h
  3. 45
      include/grpc++/config.h
  4. 75
      include/grpc++/config_protobuf.h
  5. 132
      include/grpc++/impl/call.h
  6. 27
      include/grpc++/impl/client_unary_call.h
  7. 37
      include/grpc++/impl/serialization_traits.h
  8. 36
      include/grpc++/impl/service_type.h
  9. 18
      include/grpc++/server.h
  10. 438
      include/grpc++/stream.h
  11. 1
      src/compiler/config.h

@ -58,40 +58,41 @@ template <class R>
class ClientAsyncResponseReader GRPC_FINAL
: public ClientAsyncResponseReaderInterface<R> {
public:
template <class W>
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const grpc::protobuf::Message& request)
const W& request)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose();
init_buf_.SendInitialMetadata(context->send_initial_metadata_);
init_buf_.SendMessage(request);
init_buf_.ClientSendClose();
call_.PerformOps(&init_buf_);
}
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
meta_buf_.SetOutputTag(tag);
meta_buf_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
finish_buf_.Reset(tag);
finish_buf_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
}
finish_buf_.AddRecvMessage(msg);
finish_buf_.AddClientRecvStatus(context_, status);
finish_buf_.RecvMessage(msg);
finish_buf_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
private:
ClientContext* context_;
Call call_;
SneakyCallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_buf_;
CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
CallOpSet<CallOpRecvMessage<R>, CallOpClientRecvStatus> finish_buf_;
};
template <class W>
@ -104,34 +105,34 @@ class ServerAsyncResponseWriter GRPC_FINAL
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
meta_buf_.SetOutputTag(tag);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.Reset(tag);
finish_buf_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
finish_buf_.SendMessage(msg);
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
finish_buf_.Reset(tag);
finish_buf_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -140,8 +141,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> meta_buf_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> finish_buf_;
};
} // namespace grpc

@ -35,7 +35,6 @@
#define GRPCXX_COMPLETION_QUEUE_H
#include <grpc/support/time.h>
#include <grpc++/impl/client_unary_call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/time.h>
@ -56,7 +55,10 @@ class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
class ChannelInterface;
class ClientContext;
class CompletionQueue;
class RpcMethod;
class Server;
class ServerBuilder;
class ServerContext;
@ -120,11 +122,12 @@ class CompletionQueue : public GrpcLibrary {
friend class ::grpc::ServerReaderWriter;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel,
const RpcMethod& method,
ClientContext* context,
const grpc::protobuf::Message& request,
grpc::protobuf::Message* result);
const InputMessage& request,
OutputMessage* result);
NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);

@ -77,33 +77,6 @@
#define GRPC_OVERRIDE override
#endif
#ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
#endif
#ifndef GRPC_CUSTOM_MESSAGE
#include <google/protobuf/message.h>
#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
#endif
#ifndef GRPC_CUSTOM_STRING
#include <string>
#define GRPC_CUSTOM_STRING std::string
#endif
#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
::google::protobuf::io::ZeroCopyOutputStream
#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
::google::protobuf::io::ZeroCopyInputStream
#define GRPC_CUSTOM_CODEDINPUTSTREAM \
::google::protobuf::io::CodedInputStream
#endif
#ifdef GRPC_CXX0X_NO_NULLPTR
#include <memory>
const class {
@ -121,23 +94,15 @@ private:
} nullptr = {};
#endif
#ifndef GRPC_CUSTOM_STRING
#include <string>
#define GRPC_CUSTOM_STRING std::string
#endif
namespace grpc {
typedef GRPC_CUSTOM_STRING string;
namespace protobuf {
typedef GRPC_CUSTOM_MESSAGE Message;
typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
namespace io {
typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
} // namespace io
} // namespace protobuf
} // namespace grpc
#endif // GRPCXX_CONFIG_H

@ -0,0 +1,75 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_CONFIG_PROTOBUF_H
#define GRPCXX_CONFIG_PROTOBUF_H
#include <grpc++/impl/serialization_traits.h>
#ifndef GRPC_CUSTOM_PROTOBUF_INT64
#include <google/protobuf/stubs/common.h>
#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64
#endif
#ifndef GRPC_CUSTOM_MESSAGE
#include <google/protobuf/message.h>
#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message
#endif
#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \
::google::protobuf::io::ZeroCopyOutputStream
#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \
::google::protobuf::io::ZeroCopyInputStream
#define GRPC_CUSTOM_CODEDINPUTSTREAM \
::google::protobuf::io::CodedInputStream
#endif
namespace grpc {
namespace protobuf {
typedef GRPC_CUSTOM_MESSAGE Message;
typedef GRPC_CUSTOM_PROTOBUF_INT64 int64;
namespace io {
typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream;
typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream;
typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream;
} // namespace io
} // namespace protobuf
} // namespace grpc
#endif // GRPCXX_CONFIG_PROTOBUF_H

@ -50,6 +50,128 @@ namespace grpc {
class ByteBuffer;
class Call;
class CallNoOp {
protected:
void AddOp(grpc_op* ops, size_t* nops) {}
void FinishOp(void* tag, bool* status) {}
};
class CallOpSendInitialMetadata {
public:
void SendInitialMetadata(const std::multimap<grpc::string, grpc::string>& metadata);
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
};
class CallOpSendMessage {
public:
template <class M>
void SendMessage(const M& message);
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
};
template <class M>
class CallOpRecvMessage {
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
};
class CallOpGenericRecvMessage {
public:
template <class R>
void RecvMessage(R* message);
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
};
class CallOpClientSendClose {
public:
void ClientSendClose();
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
};
class CallOpServerSendStatus {
public:
void ServerSendStatus(const std::multimap<grpc::string, grpc::string>& trailing_metadata, const Status& status);
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
};
class CallOpRecvInitialMetadata {
public:
void RecvInitialMetadata(ClientContext* context);
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
};
class CallOpClientRecvStatus {
public:
void ClientRecvStatus(ClientContext* context, Status* status);
protected:
void AddOp(grpc_op* ops, size_t* nops);
void FinishOp(void* tag, bool* status);
};
class CallOpSetInterface : public CompletionQueueTag {
public:
virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
};
template <class T, int I>
class WrapAndDerive : public T {};
template <class Op1 = CallNoOp, class Op2 = CallNoOp, class Op3 = CallNoOp, class Op4 = CallNoOp, class Op5 = CallNoOp, class Op6 = CallNoOp>
class CallOpSet : public CallOpSetInterface,
public WrapAndDerive<Op1, 1>,
public WrapAndDerive<Op2, 2>,
public WrapAndDerive<Op3, 3>,
public WrapAndDerive<Op4, 4>,
public WrapAndDerive<Op5, 5>,
public WrapAndDerive<Op6, 6> {
public:
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE {
this->Op1::AddOp(ops, nops);
this->Op2::AddOp(ops, nops);
this->Op3::AddOp(ops, nops);
this->Op4::AddOp(ops, nops);
this->Op5::AddOp(ops, nops);
this->Op6::AddOp(ops, nops);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
this->Op1::FinishOp(*tag, status);
this->Op2::FinishOp(*tag, status);
this->Op3::FinishOp(*tag, status);
this->Op4::FinishOp(*tag, status);
this->Op5::FinishOp(*tag, status);
this->Op6::FinishOp(*tag, status);
*tag = return_tag_;
return true;
}
void SetOutputTag(void* return_tag) { return_tag_ = return_tag; }
private:
void *return_tag_;
};
#if 0
class CallOpBuffer : public CompletionQueueTag {
public:
CallOpBuffer();
@ -122,12 +244,14 @@ class CallOpBuffer : public CompletionQueueTag {
int cancelled_buf_;
bool* recv_closed_;
};
#endif
// SneakyCallOpBuffer does not post completions to the completion queue
class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
template <class Op1 = CallNoOp, class Op2 = CallNoOp, class Op3 = CallNoOp, class Op4 = CallNoOp, class Op5 = CallNoOp, class Op6 = CallNoOp>
class SneakyCallOpSet GRPC_FINAL : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
public:
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
return CallOpBuffer::FinalizeResult(tag, status) && false;
return CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6>::FinalizeResult(tag, status) && false;
}
};
@ -135,7 +259,7 @@ class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
class CallHook {
public:
virtual ~CallHook() {}
virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0;
virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
};
// Straightforward wrapping of the C call object
@ -146,7 +270,7 @@ class Call GRPC_FINAL {
Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq,
int max_message_size);
void PerformOps(CallOpBuffer* buffer);
void PerformOps(CallOpSetInterface* ops);
grpc_call* call() { return call_; }
CompletionQueue* cq() { return cq_; }

@ -36,6 +36,8 @@
#include <grpc++/config.h>
#include <grpc++/impl/call.h>
namespace grpc {
class ChannelInterface;
@ -45,10 +47,31 @@ class RpcMethod;
class Status;
// Wrapper that performs a blocking unary call
template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context,
const grpc::protobuf::Message& request,
grpc::protobuf::Message* result);
const InputMessage& request,
OutputMessage* result) {
CompletionQueue cq;
Call call(channel->CreateCall(method, context, &cq));
CallOpSet<
CallOpSendInitialMetadata,
CallOpSendMessage,
CallOpRecvInitialMetadata,
CallOpRecvMessage<OutputMessage>,
CallOpClientSendClose,
CallOpClientRecvStatus> ops;
Status status;
ops.AddSendInitialMetadata(context);
ops.AddSendMessage(request);
ops.AddRecvInitialMetadata(context);
ops.AddRecvMessage(result);
ops.AddClientSendClose();
ops.AddClientRecvStatus(context, &status);
call.PerformOps(&ops);
GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.IsOk());
return status;
}
} // namespace grpc

@ -31,34 +31,19 @@
*
*/
#include <grpc++/impl/client_unary_call.h>
#include <grpc++/impl/call.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
#ifndef GRPCXX_IMPL_SERIALIZATION_TRAITS_H
#define GRPCXX_IMPL_SERIALIZATION_TRAITS_H
struct grpc_byte_buffer;
namespace grpc {
// Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context,
const grpc::protobuf::Message& request,
grpc::protobuf::Message* result) {
CompletionQueue cq;
Call call(channel->CreateCall(method, context, &cq));
CallOpBuffer buf;
Status status;
buf.AddSendInitialMetadata(context);
buf.AddSendMessage(request);
buf.AddRecvInitialMetadata(context);
buf.AddRecvMessage(result);
buf.AddClientSendClose();
buf.AddClientRecvStatus(context, &status);
call.PerformOps(&buf);
GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk());
return status;
}
template <class Message>
class SerializationTraits;
typedef bool (*SerializationTraitsReadFunction)(grpc_byte_buffer* src, void* dest);
typedef bool (*SerializationTraitsWriteFunction)(const void* src, grpc_byte_buffer* dst);
} // namespace grpc
#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H

@ -35,6 +35,8 @@
#define GRPCXX_IMPL_SERVICE_TYPE_H
#include <grpc++/config.h>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/server.h>
namespace grpc {
@ -65,20 +67,8 @@ class ServerAsyncStreamingInterface {
class AsynchronousService {
public:
// this is Server, but in disguise to avoid a link dependency
class DispatchImpl {
public:
virtual void RequestAsyncCall(void* registered_method,
ServerContext* context,
::grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) = 0;
};
AsynchronousService(const char** method_names, size_t method_count)
: dispatch_impl_(nullptr),
: server_(nullptr),
method_names_(method_names),
method_count_(method_count),
request_args_(nullptr) {}
@ -86,42 +76,44 @@ class AsynchronousService {
~AsynchronousService() { delete[] request_args_; }
protected:
template <class Message>
void RequestAsyncUnary(int index, ServerContext* context,
grpc::protobuf::Message* request,
Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
stream, call_cq, notification_cq, tag);
server_->RequestAsyncCall(request_args_[index], context,
stream, call_cq, notification_cq, tag, request);
}
void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
server_->RequestAsyncCall(request_args_[index], context,
stream, call_cq, notification_cq, tag);
}
template <class Message>
void RequestServerStreaming(int index, ServerContext* context,
grpc::protobuf::Message* request,
Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
stream, call_cq, notification_cq, tag);
server_->RequestAsyncCall(request_args_[index], context,
stream, call_cq, notification_cq, tag, request);
}
void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
server_->RequestAsyncCall(request_args_[index], context,
stream, call_cq, notification_cq, tag);
}
private:
friend class Server;
DispatchImpl* dispatch_impl_;
Server* server_;
const char** const method_names_;
size_t method_count_;
void** request_args_;

@ -53,13 +53,13 @@ class GenericServerContext;
class AsyncGenericService;
class RpcService;
class RpcServiceMethod;
class ServerAsyncStreamingInterface;
class ServerCredentials;
class ThreadPoolInterface;
// Currently it only supports handling rpcs in a single thread.
class Server GRPC_FINAL : public GrpcLibrary,
private CallHook,
private AsynchronousService::DispatchImpl {
private CallHook {
public:
~Server();
@ -73,6 +73,7 @@ class Server GRPC_FINAL : public GrpcLibrary,
private:
friend class AsyncGenericService;
friend class AsynchronousService;
friend class ServerBuilder;
class SyncRequest;
@ -96,15 +97,20 @@ class Server GRPC_FINAL : public GrpcLibrary,
void RunRpc();
void ScheduleCallback();
void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
void PerformOpsOnCall(CallOpSetInterface *ops, Call* call) GRPC_OVERRIDE;
// DispatchImpl
template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) GRPC_OVERRIDE;
void* tag, Message *message);
void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag);
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,

@ -93,15 +93,16 @@ template <class R>
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
template <class W>
ClientReader(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const grpc::protobuf::Message& request)
ClientContext* context, const W& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpBuffer buf;
buf.AddSendInitialMetadata(&context->send_initial_metadata_);
buf.AddSendMessage(request);
buf.AddClientSendClose();
call_.PerformOps(&buf);
cq_.Pluck(&buf);
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendMessage(request);
ops.ClientSendClose();
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
@ -111,28 +112,28 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
buf.AddRecvInitialMetadata(context_);
call_.PerformOps(&buf);
cq_.Pluck(&buf); // status ignored
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
buf.AddRecvInitialMetadata(context_);
ops.RecvInitialMetadata(context_);
}
buf.AddRecvMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf) && buf.got_message;
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
Status Finish() GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
@ -150,48 +151,48 @@ class ClientWriterInterface : public ClientStreamingInterface,
};
template <class W>
class ClientWriter GRPC_FINAL : public ClientWriterInterface<W> {
class ClientWriter : public ClientWriterInterface<W> {
public:
// Blocking create a stream.
template <class R>
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, grpc::protobuf::Message* response)
ClientContext* context, R* response)
: context_(context),
response_(response),
call_(channel->CreateCall(method, context, &cq_)) {
CallOpBuffer buf;
buf.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&buf);
cq_.Pluck(&buf);
finish_ops_.RecvMessage(response);
CallOpSet<CallOpRecvMessage<R>> ops;
ops.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddSendMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
CallOpSet<CallOpSendMessage> ops;
ops.SendMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddClientSendClose();
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
CallOpBuffer buf;
Status status;
buf.AddRecvMessage(response_);
buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
finish_ops_.ClientRecvStatus(context_, &status);
call_.PerformOps(&finish_ops_);
GPR_ASSERT(cq_.Pluck(&finish_ops_));
return status;
}
private:
ClientContext* context_;
grpc::protobuf::Message* const response_;
CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
@ -213,10 +214,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpBuffer buf;
buf.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&buf);
cq_.Pluck(&buf);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
@ -226,42 +227,42 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
buf.AddRecvInitialMetadata(context_);
call_.PerformOps(&buf);
cq_.Pluck(&buf); // status ignored
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
buf.AddRecvInitialMetadata(context_);
ops.RecvInitialMetadata(context_);
}
buf.AddRecvMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf) && buf.got_message;
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddSendMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
CallOpSet<CallOpSendMessage> ops;
ops.SendMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddClientSendClose();
call_.PerformOps(&buf);
return cq_.Pluck(&buf);
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
Status Finish() GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
@ -279,18 +280,18 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpBuffer buf;
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&buf);
call_->cq()->Pluck(&buf);
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf) && buf.got_message;
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
@ -306,22 +307,22 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpBuffer buf;
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&buf);
call_->cq()->Pluck(&buf);
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ctx_->sent_initial_metadata_) {
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
buf.AddSendMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf);
ops.SendMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
@ -339,29 +340,29 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpBuffer buf;
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&buf);
call_->cq()->Pluck(&buf);
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf) && buf.got_message;
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpBuffer buf;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ctx_->sent_initial_metadata_) {
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
buf.AddSendMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf);
ops.SendMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
@ -407,50 +408,51 @@ template <class R>
class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
public:
// Create a stream and write the first request out.
template <class W>
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const grpc::protobuf::Message& request, void* tag)
const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose();
call_.PerformOps(&init_buf_);
init_ops_.SetOutputTag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendMessage(request);
init_ops_.ClientSendClose();
call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
meta_ops_.SetOutputTag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_buf_.Reset(tag);
read_ops_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
read_buf_.AddRecvInitialMetadata(context_);
read_ops_.RecvInitialMetadata(context_);
}
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
finish_buf_.Reset(tag);
finish_ops_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
finish_ops_.RecvInitialMetadata(context_);
}
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
Call call_;
CallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
template <class W>
@ -463,56 +465,56 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
template <class W>
class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
public:
template <class R>
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
grpc::protobuf::Message* response, void* tag)
R* response, void* tag)
: context_(context),
response_(response),
call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&init_buf_);
finish_ops_.RecvMessage(response);
init_ops_.SetOutputTag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
meta_ops_.SetOutputTag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_buf_.Reset(tag);
write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_);
write_ops_.SetOutputTag(tag);
write_ops_.SendMessage(msg);
call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
writes_done_buf_.Reset(tag);
writes_done_buf_.AddClientSendClose();
call_.PerformOps(&writes_done_buf_);
writes_done_ops_.SetOutputTag(tag);
writes_done_ops_.ClientSendClose();
call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
finish_buf_.Reset(tag);
finish_ops_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
finish_ops_.RecvInitialMetadata(context_);
}
finish_buf_.AddRecvMessage(response_);
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
grpc::protobuf::Message* const response_;
Call call_;
CallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer write_buf_;
CallOpBuffer writes_done_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpSendMessage> write_ops_;
CallOpSet<CallOpClientSendClose> writes_done_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
};
// Client-side interface for bi-directional streaming.
@ -532,58 +534,58 @@ class ClientAsyncReaderWriter GRPC_FINAL
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
call_.PerformOps(&init_buf_);
init_ops_.SetOutputTag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
meta_ops_.SetOutputTag(tag);
meta_ops_.RecvInitialMetadata(context_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_buf_.Reset(tag);
read_ops_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
read_buf_.AddRecvInitialMetadata(context_);
read_ops_.RecvInitialMetadata(context_);
}
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
read_ops_.AddRecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_buf_.Reset(tag);
write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_);
write_ops_.SetOutputTag(tag);
write_ops_.SendMessage(msg);
call_.PerformOps(&write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
writes_done_buf_.Reset(tag);
writes_done_buf_.AddClientSendClose();
call_.PerformOps(&writes_done_buf_);
writes_done_ops_.SetOutputTag(tag);
writes_done_ops_.ClientSendClose();
call_.PerformOps(&writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
finish_buf_.Reset(tag);
finish_ops_.SetOutputTag(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
finish_ops_.RecvInitialMetadata(context_);
}
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ClientRecvStatus(context_, status);
call_.PerformOps(&finish_ops_);
}
private:
ClientContext* context_;
Call call_;
CallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer write_buf_;
CallOpBuffer writes_done_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendMessage> write_ops_;
CallOpSet<CallOpClientSendClose> writes_done_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
};
template <class W, class R>
@ -596,41 +598,41 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
meta_ops_.SetOutputTag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
read_ops_.SetOutputTag(tag);
read_ops_.RecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.Reset(tag);
finish_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
finish_ops_.SendMessage(msg);
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
finish_buf_.Reset(tag);
finish_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
private:
@ -638,9 +640,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> finish_ops_;
};
template <class W>
@ -653,30 +655,30 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
meta_ops_.SetOutputTag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
call_.PerformOps(&meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_buf_.Reset(tag);
write_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_);
write_ops_.SendMessage(msg);
call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
finish_buf_.Reset(tag);
finish_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
private:
@ -684,9 +686,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer write_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
// Server-side interface for bi-directional streaming.
@ -701,36 +703,36 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
meta_ops_.SetOutputTag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
call_.PerformOps(&meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
read_buf_.Reset(tag);
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
read_ops_.SetOutputTag(tag);
read_ops_.AddRecvMessage(msg);
call_.PerformOps(&read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_buf_.Reset(tag);
write_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_);
write_ops_.SendMessage(msg);
call_.PerformOps(&write_ops_);
}
void Finish(const Status& status, void* tag) {
finish_buf_.Reset(tag);
finish_ops_.SetOutputTag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_ops_);
}
private:
@ -738,10 +740,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer read_buf_;
CallOpBuffer write_buf_;
CallOpBuffer finish_buf_;
CallOpSet<CallOpSendInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
};
} // namespace grpc

@ -35,6 +35,7 @@
#define SRC_COMPILER_CONFIG_H
#include <grpc++/config.h>
#include <grpc++/config_protobuf.h>
#ifndef GRPC_CUSTOM_DESCRIPTOR
#include <google/protobuf/descriptor.h>

Loading…
Cancel
Save