Construction of streams shouldn't require triggering async ops

pull/12269/head
Vijay Pai 8 years ago
parent c3aba91be9
commit 4b047a3bff
  1. 12
      examples/cpp/helloworld/greeter_async_client.cc
  2. 14
      examples/cpp/helloworld/greeter_async_client2.cc
  3. 7
      include/grpc++/generic/generic_stub.h
  4. 136
      include/grpc++/impl/codegen/async_stream.h
  5. 41
      include/grpc++/impl/codegen/async_unary_call.h
  6. 296
      src/compiler/cpp_generator.cc
  7. 23
      src/cpp/client/generic_stub.cc
  8. 40
      test/cpp/codegen/compiler_test_golden
  9. 5
      test/cpp/codegen/compiler_test_mock_golden
  10. 143
      test/cpp/qps/client_async.cc

@ -60,11 +60,15 @@ class GreeterClient {
// Storage for the status of the RPC upon completion. // Storage for the status of the RPC upon completion.
Status status; Status status;
// stub_->AsyncSayHello() performs the RPC call, returning an instance we // stub_->PrepareAsyncSayHello() creates an RPC object, returning
// store in "rpc". Because we are using the asynchronous API, we need to // an instance to store in "call" but does not actually start the RPC
// hold on to the "rpc" instance in order to get updates on the ongoing RPC. // Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc( std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
stub_->AsyncSayHello(&context, request, &cq)); stub_->PrepareAsyncSayHello(&context, request, &cq));
// StartCall initiates the RPC call
rpc->StartCall();
// Request that, upon completion of the RPC, "reply" be updated with the // Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation // server's response; "status" with the indication of whether the operation

@ -49,11 +49,15 @@ class GreeterClient {
// Call object to store rpc data // Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall; AsyncClientCall* call = new AsyncClientCall;
// stub_->AsyncSayHello() performs the RPC call, returning an instance to // stub_->PrepareAsyncSayHello() creates an RPC object, returning
// store in "call". Because we are using the asynchronous API, we need to // an instance to store in "call" but does not actually start the RPC
// hold on to the "call" instance in order to get updates on the ongoing RPC. // Because we are using the asynchronous API, we need to hold on to
call->response_reader = stub_->AsyncSayHello(&call->context, request, &cq_); // the "call" instance in order to get updates on the ongoing RPC.
call->response_reader =
stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
// StartCall initiates the RPC call
call->response_reader->StartCall();
// Request that, upon completion of the RPC, "reply" be updated with the // Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation // server's response; "status" with the indication of whether the operation

@ -44,6 +44,13 @@ class GenericStub final {
ClientContext* context, const grpc::string& method, CompletionQueue* cq, ClientContext* context, const grpc::string& method, CompletionQueue* cq,
void* tag); void* tag);
/// Setup a call to a named method \a method using \a context, but don't
/// start it. Let it be started explicitly with StartCall and a tag.
/// The return value only indicates whether or not registration of the call
/// succeeded (i.e. the call won't proceed if the return value is nullptr).
std::unique_ptr<GenericClientAsyncReaderWriter> PrepareCall(
ClientContext* context, const grpc::string& method, CompletionQueue* cq);
private: private:
std::shared_ptr<ChannelInterface> channel_; std::shared_ptr<ChannelInterface> channel_;
}; };

@ -35,6 +35,11 @@ class ClientAsyncStreamingInterface {
public: public:
virtual ~ClientAsyncStreamingInterface() {} virtual ~ClientAsyncStreamingInterface() {}
/// Start the call that was set up by the constructor, but only if the
/// constructor was invoked through the "Prepare" API which doesn't actually
/// start the call
virtual void StartCall(void* tag) = 0;
/// Request notification of the reading of the initial metadata. Completion /// Request notification of the reading of the initial metadata. Completion
/// will be notified by \a tag on the associated completion queue. /// will be notified by \a tag on the associated completion queue.
/// This call is optional, but if it is used, it cannot be used concurrently /// This call is optional, but if it is used, it cannot be used concurrently
@ -156,20 +161,22 @@ class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
template <class R> template <class R>
class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
public: public:
/// Create a stream and write the first request out. /// Create a stream object.
/// Write the first request out if \a start is set.
/// \a tag will be notified on \a cq when the call has been started and /// \a tag will be notified on \a cq when the call has been started and
/// \a request has been written out. /// \a request has been written out. If \a start is not set, \a tag must be
/// nullptr and the actual call must be initiated by StartCall
/// Note that \a context will be used to fill in custom initial metadata /// Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call. /// used to send to the server when starting the call.
template <class W> template <class W>
static ClientAsyncReader* Create(ChannelInterface* channel, static ClientAsyncReader* Create(ChannelInterface* channel,
CompletionQueue* cq, const RpcMethod& method, CompletionQueue* cq, const RpcMethod& method,
ClientContext* context, const W& request, ClientContext* context, const W& request,
void* tag) { bool start, void* tag) {
Call call = channel->CreateCall(method, context, cq); Call call = channel->CreateCall(method, context, cq);
return new (g_core_codegen_interface->grpc_call_arena_alloc( return new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientAsyncReader))) call.call(), sizeof(ClientAsyncReader)))
ClientAsyncReader(call, context, request, tag); ClientAsyncReader(call, context, request, start, tag);
} }
// always allocated against a call arena, no memory free required // always allocated against a call arena, no memory free required
@ -177,6 +184,12 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
assert(size == sizeof(ClientAsyncReader)); assert(size == sizeof(ClientAsyncReader));
} }
void StartCall(void* tag) override {
assert(!started_);
started_ = true;
StartCallInternal(tag);
}
/// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata
/// method for semantics. /// method for semantics.
/// ///
@ -186,6 +199,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
/// calling code can access the received metadata through the /// calling code can access the received metadata through the
/// \a ClientContext. /// \a ClientContext.
void ReadInitialMetadata(void* tag) override { void ReadInitialMetadata(void* tag) override {
assert(started_);
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
meta_ops_.set_output_tag(tag); meta_ops_.set_output_tag(tag);
@ -194,6 +208,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
} }
void Read(R* msg, void* tag) override { void Read(R* msg, void* tag) override {
assert(started_);
read_ops_.set_output_tag(tag); read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
read_ops_.RecvInitialMetadata(context_); read_ops_.RecvInitialMetadata(context_);
@ -208,6 +223,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
/// - the \a ClientContext associated with this call is updated with /// - the \a ClientContext associated with this call is updated with
/// possible initial and trailing metadata received from the server. /// possible initial and trailing metadata received from the server.
void Finish(Status* status, void* tag) override { void Finish(Status* status, void* tag) override {
assert(started_);
finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
finish_ops_.RecvInitialMetadata(context_); finish_ops_.RecvInitialMetadata(context_);
@ -219,19 +235,28 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
private: private:
template <class W> template <class W>
ClientAsyncReader(Call call, ClientContext* context, const W& request, ClientAsyncReader(Call call, ClientContext* context, const W& request,
void* tag) bool start, void* tag)
: context_(context), call_(call) { : context_(context), call_(call), started_(start) {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
init_ops_.ClientSendClose(); init_ops_.ClientSendClose();
if (start) {
StartCallInternal(tag);
} else {
assert(tag == nullptr);
}
}
void StartCallInternal(void* tag) {
init_ops_.SendInitialMetadata(context_->send_initial_metadata_,
context_->initial_metadata_flags());
init_ops_.set_output_tag(tag);
call_.PerformOps(&init_ops_); call_.PerformOps(&init_ops_);
} }
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
bool started_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
init_ops_; init_ops_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
@ -257,9 +282,12 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
template <class W> template <class W>
class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
public: public:
/// Create a stream and write the first request out. /// Create a stream object.
/// Start the RPC if \a start is set
/// \a tag will be notified on \a cq when the call has been started (i.e. /// \a tag will be notified on \a cq when the call has been started (i.e.
/// intitial metadata sent) and \a request has been written out. /// intitial metadata sent) and \a request has been written out.
/// If \a start is not set, \a tag must be nullptr and the actual call
/// must be initiated by StartCall
/// Note that \a context will be used to fill in custom initial metadata /// Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call. /// used to send to the server when starting the call.
/// \a response will be filled in with the single expected response /// \a response will be filled in with the single expected response
@ -269,11 +297,11 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
static ClientAsyncWriter* Create(ChannelInterface* channel, static ClientAsyncWriter* Create(ChannelInterface* channel,
CompletionQueue* cq, const RpcMethod& method, CompletionQueue* cq, const RpcMethod& method,
ClientContext* context, R* response, ClientContext* context, R* response,
void* tag) { bool start, void* tag) {
Call call = channel->CreateCall(method, context, cq); Call call = channel->CreateCall(method, context, cq);
return new (g_core_codegen_interface->grpc_call_arena_alloc( return new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientAsyncWriter))) call.call(), sizeof(ClientAsyncWriter)))
ClientAsyncWriter(call, context, response, tag); ClientAsyncWriter(call, context, response, start, tag);
} }
// always allocated against a call arena, no memory free required // always allocated against a call arena, no memory free required
@ -281,6 +309,12 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
assert(size == sizeof(ClientAsyncWriter)); assert(size == sizeof(ClientAsyncWriter));
} }
void StartCall(void* tag) override {
assert(!started_);
started_ = true;
StartCallInternal(tag);
}
/// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for
/// semantics. /// semantics.
/// ///
@ -289,6 +323,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
/// associated with this call is updated, and the calling code can access /// associated with this call is updated, and the calling code can access
/// the received metadata through the \a ClientContext. /// the received metadata through the \a ClientContext.
void ReadInitialMetadata(void* tag) override { void ReadInitialMetadata(void* tag) override {
assert(started_);
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
meta_ops_.set_output_tag(tag); meta_ops_.set_output_tag(tag);
@ -297,6 +332,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
} }
void Write(const W& msg, void* tag) override { void Write(const W& msg, void* tag) override {
assert(started_);
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
@ -304,6 +340,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
} }
void Write(const W& msg, WriteOptions options, void* tag) override { void Write(const W& msg, WriteOptions options, void* tag) override {
assert(started_);
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
if (options.is_last_message()) { if (options.is_last_message()) {
options.set_buffer_hint(); options.set_buffer_hint();
@ -315,6 +352,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
} }
void WritesDone(void* tag) override { void WritesDone(void* tag) override {
assert(started_);
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
write_ops_.ClientSendClose(); write_ops_.ClientSendClose();
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
@ -328,6 +366,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
/// - attempts to fill in the \a response parameter passed to this class's /// - attempts to fill in the \a response parameter passed to this class's
/// constructor with the server's response message. /// constructor with the server's response message.
void Finish(Status* status, void* tag) override { void Finish(Status* status, void* tag) override {
assert(started_);
finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
finish_ops_.RecvInitialMetadata(context_); finish_ops_.RecvInitialMetadata(context_);
@ -338,25 +377,32 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
private: private:
template <class R> template <class R>
ClientAsyncWriter(Call call, ClientContext* context, R* response, void* tag) ClientAsyncWriter(Call call, ClientContext* context, R* response, bool start,
: context_(context), call_(call) { void* tag)
: context_(context), call_(call), started_(start) {
finish_ops_.RecvMessage(response); finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage(); finish_ops_.AllowNoMessage();
// if corked bit is set in context, we buffer up the initial metadata to if (start) {
// coalesce with later message to be sent. No op is performed. StartCallInternal(tag);
if (context_->initial_metadata_corked_) {
write_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
} else { } else {
assert(tag == nullptr);
}
}
void StartCallInternal(void* tag) {
write_ops_.SendInitialMetadata(context_->send_initial_metadata_,
context_->initial_metadata_flags());
// if corked bit is set in context, we just keep the initial metadata
// buffered up to coalesce with later message send. No op is performed.
if (!context_->initial_metadata_corked_) {
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
write_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
} }
} }
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
bool started_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
write_ops_; write_ops_;
@ -388,20 +434,23 @@ template <class W, class R>
class ClientAsyncReaderWriter final class ClientAsyncReaderWriter final
: public ClientAsyncReaderWriterInterface<W, R> { : public ClientAsyncReaderWriterInterface<W, R> {
public: public:
/// Create a stream and write the first request out. /// Create a stream object.
/// Start the RPC request if \a start is set.
/// \a tag will be notified on \a cq when the call has been started (i.e. /// \a tag will be notified on \a cq when the call has been started (i.e.
/// intitial metadata sent). /// intitial metadata sent). If \a start is not set, \a tag must be
/// nullptr and the actual call must be initiated by StartCall
/// Note that \a context will be used to fill in custom initial metadata /// Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call. /// used to send to the server when starting the call.
static ClientAsyncReaderWriter* Create(ChannelInterface* channel, static ClientAsyncReaderWriter* Create(ChannelInterface* channel,
CompletionQueue* cq, CompletionQueue* cq,
const RpcMethod& method, const RpcMethod& method,
ClientContext* context, void* tag) { ClientContext* context, bool start,
void* tag) {
Call call = channel->CreateCall(method, context, cq); Call call = channel->CreateCall(method, context, cq);
return new (g_core_codegen_interface->grpc_call_arena_alloc( return new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientAsyncReaderWriter))) call.call(), sizeof(ClientAsyncReaderWriter)))
ClientAsyncReaderWriter(call, context, tag); ClientAsyncReaderWriter(call, context, start, tag);
} }
// always allocated against a call arena, no memory free required // always allocated against a call arena, no memory free required
@ -409,6 +458,12 @@ class ClientAsyncReaderWriter final
assert(size == sizeof(ClientAsyncReaderWriter)); assert(size == sizeof(ClientAsyncReaderWriter));
} }
void StartCall(void* tag) override {
assert(!started_);
started_ = true;
StartCallInternal(tag);
}
/// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method
/// for semantics of this method. /// for semantics of this method.
/// ///
@ -417,6 +472,7 @@ class ClientAsyncReaderWriter final
/// is updated with it, and then the receiving initial metadata can /// is updated with it, and then the receiving initial metadata can
/// be accessed through this \a ClientContext. /// be accessed through this \a ClientContext.
void ReadInitialMetadata(void* tag) override { void ReadInitialMetadata(void* tag) override {
assert(started_);
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
meta_ops_.set_output_tag(tag); meta_ops_.set_output_tag(tag);
@ -425,6 +481,7 @@ class ClientAsyncReaderWriter final
} }
void Read(R* msg, void* tag) override { void Read(R* msg, void* tag) override {
assert(started_);
read_ops_.set_output_tag(tag); read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
read_ops_.RecvInitialMetadata(context_); read_ops_.RecvInitialMetadata(context_);
@ -434,6 +491,7 @@ class ClientAsyncReaderWriter final
} }
void Write(const W& msg, void* tag) override { void Write(const W& msg, void* tag) override {
assert(started_);
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
@ -441,6 +499,7 @@ class ClientAsyncReaderWriter final
} }
void Write(const W& msg, WriteOptions options, void* tag) override { void Write(const W& msg, WriteOptions options, void* tag) override {
assert(started_);
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
if (options.is_last_message()) { if (options.is_last_message()) {
options.set_buffer_hint(); options.set_buffer_hint();
@ -452,6 +511,7 @@ class ClientAsyncReaderWriter final
} }
void WritesDone(void* tag) override { void WritesDone(void* tag) override {
assert(started_);
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
write_ops_.ClientSendClose(); write_ops_.ClientSendClose();
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
@ -462,6 +522,7 @@ class ClientAsyncReaderWriter final
/// - the \a ClientContext associated with this call is updated with /// - the \a ClientContext associated with this call is updated with
/// possible initial and trailing metadata sent from the server. /// possible initial and trailing metadata sent from the server.
void Finish(Status* status, void* tag) override { void Finish(Status* status, void* tag) override {
assert(started_);
finish_ops_.set_output_tag(tag); finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
finish_ops_.RecvInitialMetadata(context_); finish_ops_.RecvInitialMetadata(context_);
@ -471,23 +532,30 @@ class ClientAsyncReaderWriter final
} }
private: private:
ClientAsyncReaderWriter(Call call, ClientContext* context, void* tag) ClientAsyncReaderWriter(Call call, ClientContext* context, bool start,
: context_(context), call_(call) { void* tag)
if (context_->initial_metadata_corked_) { : context_(context), call_(call), started_(start) {
// if corked bit is set in context, we buffer up the initial metadata to if (start) {
// coalesce with later message to be sent. No op is performed. StartCallInternal(tag);
write_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
} else { } else {
assert(tag == nullptr);
}
}
void StartCallInternal(void* tag) {
write_ops_.SendInitialMetadata(context_->send_initial_metadata_,
context_->initial_metadata_flags());
// if corked bit is set in context, we just keep the initial metadata
// buffered up to coalesce with later message send. No op is performed.
if (!context_->initial_metadata_corked_) {
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
write_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
} }
} }
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
bool started_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>

@ -32,13 +32,18 @@ namespace grpc {
class CompletionQueue; class CompletionQueue;
extern CoreCodegenInterface* g_core_codegen_interface; extern CoreCodegenInterface* g_core_codegen_interface;
/// An interface relevant for async client side unary RPCS (which send /// An interface relevant for async client side unary RPCs (which send
/// one request message to a server and receive one response message). /// one request message to a server and receive one response message).
template <class R> template <class R>
class ClientAsyncResponseReaderInterface { class ClientAsyncResponseReaderInterface {
public: public:
virtual ~ClientAsyncResponseReaderInterface() {} virtual ~ClientAsyncResponseReaderInterface() {}
/// Start the call that was set up by the constructor, but only if the
/// constructor was invoked through the "Prepare" API which doesn't actually
/// start the call
virtual void StartCall() = 0;
/// Request notification of the reading of initial metadata. Completion /// Request notification of the reading of initial metadata. Completion
/// will be notified by \a tag on the associated completion queue. /// will be notified by \a tag on the associated completion queue.
/// This call is optional, but if it is used, it cannot be used concurrently /// This call is optional, but if it is used, it cannot be used concurrently
@ -70,9 +75,10 @@ template <class R>
class ClientAsyncResponseReader final class ClientAsyncResponseReader final
: public ClientAsyncResponseReaderInterface<R> { : public ClientAsyncResponseReaderInterface<R> {
public: public:
/// Start a call and write the request out. /// Start a call and write the request out if \a start is set.
/// \a tag will be notified on \a cq when the call has been started (i.e. /// \a tag will be notified on \a cq when the call has been started (i.e.
/// intitial metadata sent) and \a request has been written out. /// intitial metadata sent) and \a request has been written out.
/// If \a start is not set, the actual call must be initiated by StartCall
/// Note that \a context will be used to fill in custom initial metadata /// Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call. /// used to send to the server when starting the call.
template <class W> template <class W>
@ -80,11 +86,11 @@ class ClientAsyncResponseReader final
CompletionQueue* cq, CompletionQueue* cq,
const RpcMethod& method, const RpcMethod& method,
ClientContext* context, ClientContext* context,
const W& request) { const W& request, bool start) {
Call call = channel->CreateCall(method, context, cq); Call call = channel->CreateCall(method, context, cq);
return new (g_core_codegen_interface->grpc_call_arena_alloc( return new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientAsyncResponseReader))) call.call(), sizeof(ClientAsyncResponseReader)))
ClientAsyncResponseReader(call, context, request); ClientAsyncResponseReader(call, context, request, start);
} }
// always allocated against a call arena, no memory free required // always allocated against a call arena, no memory free required
@ -92,13 +98,20 @@ class ClientAsyncResponseReader final
assert(size == sizeof(ClientAsyncResponseReader)); assert(size == sizeof(ClientAsyncResponseReader));
} }
void StartCall() override {
assert(!started_);
started_ = true;
StartCallInternal();
}
/// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for
/// semantics. /// semantics.
/// ///
/// Side effect: /// Side effect:
/// - the \a ClientContext associated with this call is updated with /// - the \a ClientContext associated with this call is updated with
/// possible initial and trailing metadata sent from the server. /// possible initial and trailing metadata sent from the server.
void ReadInitialMetadata(void* tag) { void ReadInitialMetadata(void* tag) override {
assert(started_);
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
meta_buf.set_output_tag(tag); meta_buf.set_output_tag(tag);
@ -111,7 +124,8 @@ class ClientAsyncResponseReader final
/// Side effect: /// Side effect:
/// - the \a ClientContext associated with this call is updated with /// - the \a ClientContext associated with this call is updated with
/// possible initial and trailing metadata sent from the server. /// possible initial and trailing metadata sent from the server.
void Finish(R* msg, Status* status, void* tag) { void Finish(R* msg, Status* status, void* tag) override {
assert(started_);
finish_buf.set_output_tag(tag); finish_buf.set_output_tag(tag);
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
finish_buf.RecvInitialMetadata(context_); finish_buf.RecvInitialMetadata(context_);
@ -125,15 +139,22 @@ class ClientAsyncResponseReader final
private: private:
ClientContext* const context_; ClientContext* const context_;
Call call_; Call call_;
bool started_;
template <class W> template <class W>
ClientAsyncResponseReader(Call call, ClientContext* context, const W& request) ClientAsyncResponseReader(Call call, ClientContext* context, const W& request,
: context_(context), call_(call) { bool start)
init_buf.SendInitialMetadata(context->send_initial_metadata_, : context_(context), call_(call), started_(start) {
context->initial_metadata_flags()); // Bind the metadata at time of StartCallInternal but set up the rest here
// TODO(ctiller): don't assert // TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(init_buf.SendMessage(request).ok()); GPR_CODEGEN_ASSERT(init_buf.SendMessage(request).ok());
init_buf.ClientSendClose(); init_buf.ClientSendClose();
if (start) StartCallInternal();
}
void StartCallInternal() {
init_buf.SendInitialMetadata(context_->send_initial_metadata_,
context_->initial_metadata_flags());
call_.PerformOps(&init_buf); call_.PerformOps(&init_buf);
} }

@ -165,25 +165,37 @@ void PrintHeaderClientMethodInterfaces(
(*vars)["Request"] = method->input_type_name(); (*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name(); (*vars)["Response"] = method->output_type_name();
struct {
grpc::string prefix;
grpc::string method_params; // extra arguments to method
grpc::string raw_args; // extra arguments to raw version of method
} async_prefixes[] = {{"Async", ", void* tag", ", tag"},
{"PrepareAsync", "", ""}};
if (is_public) { if (is_public) {
if (method->NoStreaming()) { if (method->NoStreaming()) {
printer->Print( printer->Print(
*vars, *vars,
"virtual ::grpc::Status $Method$(::grpc::ClientContext* context, " "virtual ::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) = 0;\n"); "const $Request$& request, $Response$* response) = 0;\n");
printer->Print(*vars, for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
printer->Print(
*vars,
"std::unique_ptr< " "std::unique_ptr< "
"::grpc::ClientAsyncResponseReaderInterface< $Response$>> " "::grpc::ClientAsyncResponseReaderInterface< $Response$>> "
"Async$Method$(::grpc::ClientContext* context, " "$AsyncPrefix$$Method$(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n"); "::grpc::CompletionQueue* cq) {\n");
printer->Indent(); printer->Indent();
printer->Print(*vars, printer->Print(
*vars,
"return std::unique_ptr< " "return std::unique_ptr< "
"::grpc::ClientAsyncResponseReaderInterface< $Response$>>(" "::grpc::ClientAsyncResponseReaderInterface< $Response$>>("
"Async$Method$Raw(context, request, cq));\n"); "$AsyncPrefix$$Method$Raw(context, request, cq));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
}
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
@ -197,19 +209,26 @@ void PrintHeaderClientMethodInterfaces(
"($Method$Raw(context, response));\n"); "($Method$Raw(context, response));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncRawArgs"] = async_prefix.raw_args;
printer->Print( printer->Print(
*vars, *vars,
"std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>" "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>"
" Async$Method$(::grpc::ClientContext* context, $Response$* " " $AsyncPrefix$$Method$(::grpc::ClientContext* context, "
"$Response$* "
"response, " "response, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
printer->Indent(); printer->Indent();
printer->Print(*vars, printer->Print(*vars,
"return std::unique_ptr< " "return std::unique_ptr< "
"::grpc::ClientAsyncWriterInterface< $Request$>>(" "::grpc::ClientAsyncWriterInterface< $Request$>>("
"Async$Method$Raw(context, response, cq, tag));\n"); "$AsyncPrefix$$Method$Raw(context, response, "
"cq$AsyncRawArgs$));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
}
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
@ -223,19 +242,25 @@ void PrintHeaderClientMethodInterfaces(
"($Method$Raw(context, request));\n"); "($Method$Raw(context, request));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncRawArgs"] = async_prefix.raw_args;
printer->Print( printer->Print(
*vars, *vars,
"std::unique_ptr< ::grpc::ClientAsyncReaderInterface< $Response$>> " "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< $Response$>> "
"Async$Method$(" "$AsyncPrefix$$Method$("
"::grpc::ClientContext* context, const $Request$& request, " "::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
printer->Indent(); printer->Indent();
printer->Print(*vars, printer->Print(
*vars,
"return std::unique_ptr< " "return std::unique_ptr< "
"::grpc::ClientAsyncReaderInterface< $Response$>>(" "::grpc::ClientAsyncReaderInterface< $Response$>>("
"Async$Method$Raw(context, request, cq, tag));\n"); "$AsyncPrefix$$Method$Raw(context, request, cq$AsyncRawArgs$));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
}
} else if (method->BidiStreaming()) { } else if (method->BidiStreaming()) {
printer->Print(*vars, printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientReaderWriterInterface< " "std::unique_ptr< ::grpc::ClientReaderWriterInterface< "
@ -249,61 +274,83 @@ void PrintHeaderClientMethodInterfaces(
"$Method$Raw(context));\n"); "$Method$Raw(context));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncRawArgs"] = async_prefix.raw_args;
printer->Print( printer->Print(
*vars, *vars,
"std::unique_ptr< " "std::unique_ptr< "
"::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>> " "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>> "
"Async$Method$(::grpc::ClientContext* context, " "$AsyncPrefix$$Method$(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
printer->Indent(); printer->Indent();
printer->Print( printer->Print(
*vars, *vars,
"return std::unique_ptr< " "return std::unique_ptr< "
"::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>>(" "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>>("
"Async$Method$Raw(context, cq, tag));\n"); "$AsyncPrefix$$Method$Raw(context, cq$AsyncRawArgs$));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
} }
}
} else { } else {
if (method->NoStreaming()) { if (method->NoStreaming()) {
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
printer->Print( printer->Print(
*vars, *vars,
"virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* " "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, " "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq) = 0;\n"); "::grpc::CompletionQueue* cq) = 0;\n");
}
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
"virtual ::grpc::ClientWriterInterface< $Request$>*" "virtual ::grpc::ClientWriterInterface< $Request$>*"
" $Method$Raw(" " $Method$Raw("
"::grpc::ClientContext* context, $Response$* response) = 0;\n"); "::grpc::ClientContext* context, $Response$* response) = 0;\n");
printer->Print(*vars, for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
printer->Print(
*vars,
"virtual ::grpc::ClientAsyncWriterInterface< $Request$>*" "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*"
" Async$Method$Raw(::grpc::ClientContext* context, " " $AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, "
"$Response$* response, " "$Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) = 0;\n");
}
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
"virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw(" "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw("
"::grpc::ClientContext* context, const $Request$& request) = 0;\n"); "::grpc::ClientContext* context, const $Request$& request) = 0;\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
printer->Print( printer->Print(
*vars, *vars,
"virtual ::grpc::ClientAsyncReaderInterface< $Response$>* " "virtual ::grpc::ClientAsyncReaderInterface< $Response$>* "
"Async$Method$Raw(" "$AsyncPrefix$$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request, " "::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) = 0;\n");
}
} else if (method->BidiStreaming()) { } else if (method->BidiStreaming()) {
printer->Print(*vars, printer->Print(*vars,
"virtual ::grpc::ClientReaderWriterInterface< $Request$, " "virtual ::grpc::ClientReaderWriterInterface< $Request$, "
"$Response$>* " "$Response$>* "
"$Method$Raw(::grpc::ClientContext* context) = 0;\n"); "$Method$Raw(::grpc::ClientContext* context) = 0;\n");
printer->Print(*vars, for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
printer->Print(
*vars,
"virtual ::grpc::ClientAsyncReaderWriterInterface< " "virtual ::grpc::ClientAsyncReaderWriterInterface< "
"$Request$, $Response$>* " "$Request$, $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, " "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) = 0;\n");
}
} }
} }
} }
@ -315,25 +362,35 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer,
(*vars)["Method"] = method->name(); (*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name(); (*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name(); (*vars)["Response"] = method->output_type_name();
struct {
grpc::string prefix;
grpc::string method_params; // extra arguments to method
grpc::string raw_args; // extra arguments to raw version of method
} async_prefixes[] = {{"Async", ", void* tag", ", tag"},
{"PrepareAsync", "", ""}};
if (is_public) { if (is_public) {
if (method->NoStreaming()) { if (method->NoStreaming()) {
printer->Print( printer->Print(
*vars, *vars,
"::grpc::Status $Method$(::grpc::ClientContext* context, " "::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) override;\n"); "const $Request$& request, $Response$* response) override;\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
printer->Print( printer->Print(
*vars, *vars,
"std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> " "std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> "
"Async$Method$(::grpc::ClientContext* context, " "$AsyncPrefix$$Method$(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n"); "::grpc::CompletionQueue* cq) {\n");
printer->Indent(); printer->Indent();
printer->Print(*vars, printer->Print(*vars,
"return std::unique_ptr< " "return std::unique_ptr< "
"::grpc::ClientAsyncResponseReader< $Response$>>(" "::grpc::ClientAsyncResponseReader< $Response$>>("
"Async$Method$Raw(context, request, cq));\n"); "$AsyncPrefix$$Method$Raw(context, request, cq));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
}
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
@ -346,18 +403,24 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer,
"($Method$Raw(context, response));\n"); "($Method$Raw(context, response));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncRawArgs"] = async_prefix.raw_args;
printer->Print(*vars, printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>" "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>"
" Async$Method$(::grpc::ClientContext* context, " " $AsyncPrefix$$Method$(::grpc::ClientContext* context, "
"$Response$* response, " "$Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
printer->Indent(); printer->Indent();
printer->Print( printer->Print(
*vars, *vars,
"return std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>(" "return std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>("
"Async$Method$Raw(context, response, cq, tag));\n"); "$AsyncPrefix$$Method$Raw(context, response, "
"cq$AsyncRawArgs$));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
}
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
@ -371,19 +434,24 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer,
"($Method$Raw(context, request));\n"); "($Method$Raw(context, request));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncRawArgs"] = async_prefix.raw_args;
printer->Print( printer->Print(
*vars, *vars,
"std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> " "std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> "
"Async$Method$(" "$AsyncPrefix$$Method$("
"::grpc::ClientContext* context, const $Request$& request, " "::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
printer->Indent(); printer->Indent();
printer->Print( printer->Print(
*vars, *vars,
"return std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>>(" "return std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>>("
"Async$Method$Raw(context, request, cq, tag));\n"); "$AsyncPrefix$$Method$Raw(context, request, cq$AsyncRawArgs$));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
}
} else if (method->BidiStreaming()) { } else if (method->BidiStreaming()) {
printer->Print( printer->Print(
*vars, *vars,
@ -396,53 +464,80 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer,
"$Method$Raw(context));\n"); "$Method$Raw(context));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncRawArgs"] = async_prefix.raw_args;
printer->Print(*vars, printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
"$Request$, $Response$>> " "$Request$, $Response$>> "
"Async$Method$(::grpc::ClientContext* context, " "$AsyncPrefix$$Method$(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
printer->Indent(); printer->Indent();
printer->Print(*vars, printer->Print(
*vars,
"return std::unique_ptr< " "return std::unique_ptr< "
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>(" "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>("
"Async$Method$Raw(context, cq, tag));\n"); "$AsyncPrefix$$Method$Raw(context, cq$AsyncRawArgs$));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
} }
}
} else { } else {
if (method->NoStreaming()) { if (method->NoStreaming()) {
printer->Print(*vars, for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
printer->Print(
*vars,
"::grpc::ClientAsyncResponseReader< $Response$>* " "::grpc::ClientAsyncResponseReader< $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, " "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq) override;\n"); "::grpc::CompletionQueue* cq) override;\n");
}
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"::grpc::ClientWriter< $Request$>* $Method$Raw(" "::grpc::ClientWriter< $Request$>* $Method$Raw("
"::grpc::ClientContext* context, $Response$* response) " "::grpc::ClientContext* context, $Response$* response) "
"override;\n"); "override;\n");
printer->Print(*vars, for (auto async_prefix : async_prefixes) {
"::grpc::ClientAsyncWriter< $Request$>* Async$Method$Raw(" (*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncRawArgs"] = async_prefix.raw_args;
printer->Print(
*vars,
"::grpc::ClientAsyncWriter< $Request$>* $AsyncPrefix$$Method$Raw("
"::grpc::ClientContext* context, $Response$* response, " "::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) override;\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) override;\n");
}
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"::grpc::ClientReader< $Response$>* $Method$Raw(" "::grpc::ClientReader< $Response$>* $Method$Raw("
"::grpc::ClientContext* context, const $Request$& request)" "::grpc::ClientContext* context, const $Request$& request)"
" override;\n"); " override;\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncRawArgs"] = async_prefix.raw_args;
printer->Print( printer->Print(
*vars, *vars,
"::grpc::ClientAsyncReader< $Response$>* Async$Method$Raw(" "::grpc::ClientAsyncReader< $Response$>* $AsyncPrefix$$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request, " "::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) override;\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) override;\n");
}
} else if (method->BidiStreaming()) { } else if (method->BidiStreaming()) {
printer->Print(*vars, printer->Print(*vars,
"::grpc::ClientReaderWriter< $Request$, $Response$>* " "::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$Method$Raw(::grpc::ClientContext* context) override;\n"); "$Method$Raw(::grpc::ClientContext* context) override;\n");
printer->Print(*vars, for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncRawArgs"] = async_prefix.raw_args;
printer->Print(
*vars,
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, " "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) override;\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) override;\n");
}
} }
} }
} }
@ -1077,6 +1172,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
(*vars)["Method"] = method->name(); (*vars)["Method"] = method->name();
(*vars)["Request"] = method->input_type_name(); (*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name(); (*vars)["Response"] = method->output_type_name();
struct {
grpc::string prefix;
grpc::string start; // bool literal expressed as string
grpc::string method_params; // extra arguments to method
grpc::string create_args; // extra arguments to creator
} async_prefixes[] = {{"Async", "true", ", void* tag", ", tag"},
{"PrepareAsync", "false", "", ", nullptr"}};
if (method->NoStreaming()) { if (method->NoStreaming()) {
printer->Print(*vars, printer->Print(*vars,
"::grpc::Status $ns$$Service$::Stub::$Method$(" "::grpc::Status $ns$$Service$::Stub::$Method$("
@ -1087,10 +1189,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context, request, response);\n" "context, request, response);\n"
"}\n\n"); "}\n\n");
printer->Print( for (auto async_prefix : async_prefixes) {
*vars, (*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
printer->Print(*vars,
"::grpc::ClientAsyncResponseReader< $Response$>* " "::grpc::ClientAsyncResponseReader< $Response$>* "
"$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(::grpc::"
"ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n"); "::grpc::CompletionQueue* cq) {\n");
printer->Print(*vars, printer->Print(*vars,
@ -1098,8 +1203,9 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientAsyncResponseReader< $Response$>::Create(" "::grpc::ClientAsyncResponseReader< $Response$>::Create("
"channel_.get(), cq, " "channel_.get(), cq, "
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context, request);\n" "context, request, $AsyncStart$);\n"
"}\n\n"); "}\n\n");
}
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"::grpc::ClientWriter< $Request$>* " "::grpc::ClientWriter< $Request$>* "
@ -1111,17 +1217,23 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context, response);\n" "context, response);\n"
"}\n\n"); "}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncCreateArgs"] = async_prefix.create_args;
printer->Print(*vars, printer->Print(*vars,
"::grpc::ClientAsyncWriter< $Request$>* " "::grpc::ClientAsyncWriter< $Request$>* "
"$ns$$Service$::Stub::Async$Method$Raw(" "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw("
"::grpc::ClientContext* context, $Response$* response, " "::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
printer->Print(*vars, printer->Print(*vars,
" return ::grpc::ClientAsyncWriter< $Request$>::Create(" " return ::grpc::ClientAsyncWriter< $Request$>::Create("
"channel_.get(), cq, " "channel_.get(), cq, "
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context, response, tag);\n" "context, response, $AsyncStart$$AsyncCreateArgs$);\n"
"}\n\n"); "}\n\n");
}
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
@ -1134,17 +1246,24 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context, request);\n" "context, request);\n"
"}\n\n"); "}\n\n");
printer->Print(*vars, for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncCreateArgs"] = async_prefix.create_args;
printer->Print(
*vars,
"::grpc::ClientAsyncReader< $Response$>* " "::grpc::ClientAsyncReader< $Response$>* "
"$ns$$Service$::Stub::Async$Method$Raw(" "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request, " "::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
printer->Print(*vars, printer->Print(*vars,
" return ::grpc::ClientAsyncReader< $Response$>::Create(" " return ::grpc::ClientAsyncReader< $Response$>::Create("
"channel_.get(), cq, " "channel_.get(), cq, "
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context, request, tag);\n" "context, request, $AsyncStart$$AsyncCreateArgs$);\n"
"}\n\n"); "}\n\n");
}
} else if (method->BidiStreaming()) { } else if (method->BidiStreaming()) {
printer->Print( printer->Print(
*vars, *vars,
@ -1157,21 +1276,27 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context);\n" "context);\n"
"}\n\n"); "}\n\n");
printer->Print( for (auto async_prefix : async_prefixes) {
*vars, (*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["AsyncCreateArgs"] = async_prefix.create_args;
printer->Print(*vars,
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* "
"$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(::grpc::"
"::grpc::CompletionQueue* cq, void* tag) {\n"); "ClientContext* context, "
"::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
printer->Print( printer->Print(
*vars, *vars,
" return " " return "
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create("
"channel_.get(), cq, " "channel_.get(), cq, "
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context, tag);\n" "context, $AsyncStart$$AsyncCreateArgs$);\n"
"}\n\n"); "}\n\n");
} }
} }
}
void PrintSourceServerMethod(grpc_generator::Printer *printer, void PrintSourceServerMethod(grpc_generator::Printer *printer,
const grpc_generator::Method *method, const grpc_generator::Method *method,
@ -1460,50 +1585,79 @@ void PrintMockClientMethods(grpc_generator::Printer *printer,
(*vars)["Request"] = method->input_type_name(); (*vars)["Request"] = method->input_type_name();
(*vars)["Response"] = method->output_type_name(); (*vars)["Response"] = method->output_type_name();
struct {
grpc::string prefix;
grpc::string method_params; // extra arguments to method
int extra_method_param_count;
} async_prefixes[] = {{"Async", ", void* tag", 1}, {"PrepareAsync", "", 0}};
if (method->NoStreaming()) { if (method->NoStreaming()) {
printer->Print( printer->Print(
*vars, *vars,
"MOCK_METHOD3($Method$, ::grpc::Status(::grpc::ClientContext* context, " "MOCK_METHOD3($Method$, ::grpc::Status(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response));\n"); "const $Request$& request, $Response$* response));\n");
printer->Print(*vars, for (auto async_prefix : async_prefixes) {
"MOCK_METHOD3(Async$Method$Raw, " (*vars)["AsyncPrefix"] = async_prefix.prefix;
printer->Print(
*vars,
"MOCK_METHOD3($AsyncPrefix$$Method$Raw, "
"::grpc::ClientAsyncResponseReaderInterface< $Response$>*" "::grpc::ClientAsyncResponseReaderInterface< $Response$>*"
"(::grpc::ClientContext* context, const $Request$& request, " "(::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq));\n"); "::grpc::CompletionQueue* cq));\n");
}
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
"MOCK_METHOD2($Method$Raw, " "MOCK_METHOD2($Method$Raw, "
"::grpc::ClientWriterInterface< $Request$>*" "::grpc::ClientWriterInterface< $Request$>*"
"(::grpc::ClientContext* context, $Response$* response));\n"); "(::grpc::ClientContext* context, $Response$* response));\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["MockArgs"] =
std::to_string(3 + async_prefix.extra_method_param_count);
printer->Print(*vars, printer->Print(*vars,
"MOCK_METHOD4(Async$Method$Raw, " "MOCK_METHOD$MockArgs$($AsyncPrefix$$Method$Raw, "
"::grpc::ClientAsyncWriterInterface< $Request$>*" "::grpc::ClientAsyncWriterInterface< $Request$>*"
"(::grpc::ClientContext* context, $Response$* response, " "(::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag));\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$));\n");
}
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
"MOCK_METHOD2($Method$Raw, " "MOCK_METHOD2($Method$Raw, "
"::grpc::ClientReaderInterface< $Response$>*" "::grpc::ClientReaderInterface< $Response$>*"
"(::grpc::ClientContext* context, const $Request$& request));\n"); "(::grpc::ClientContext* context, const $Request$& request));\n");
printer->Print(*vars, for (auto async_prefix : async_prefixes) {
"MOCK_METHOD4(Async$Method$Raw, " (*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["MockArgs"] =
std::to_string(3 + async_prefix.extra_method_param_count);
printer->Print(
*vars,
"MOCK_METHOD$MockArgs$($AsyncPrefix$$Method$Raw, "
"::grpc::ClientAsyncReaderInterface< $Response$>*" "::grpc::ClientAsyncReaderInterface< $Response$>*"
"(::grpc::ClientContext* context, const $Request$& request, " "(::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag));\n"); "::grpc::CompletionQueue* cq$AsyncMethodParams$));\n");
}
} else if (method->BidiStreaming()) { } else if (method->BidiStreaming()) {
printer->Print( printer->Print(
*vars, *vars,
"MOCK_METHOD1($Method$Raw, " "MOCK_METHOD1($Method$Raw, "
"::grpc::ClientReaderWriterInterface< $Request$, $Response$>*" "::grpc::ClientReaderWriterInterface< $Request$, $Response$>*"
"(::grpc::ClientContext* context));\n"); "(::grpc::ClientContext* context));\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncMethodParams"] = async_prefix.method_params;
(*vars)["MockArgs"] =
std::to_string(2 + async_prefix.extra_method_param_count);
printer->Print( printer->Print(
*vars, *vars,
"MOCK_METHOD3(Async$Method$Raw, " "MOCK_METHOD$MockArgs$($AsyncPrefix$$Method$Raw, "
"::grpc::ClientAsyncReaderWriterInterface<$Request$, $Response$>*" "::grpc::ClientAsyncReaderWriterInterface<$Request$, $Response$>*"
"(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, " "(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq"
"void* tag));\n"); "$AsyncMethodParams$));\n");
}
} }
} }

@ -22,14 +22,29 @@
namespace grpc { namespace grpc {
namespace {
std::unique_ptr<GenericClientAsyncReaderWriter> CallInternal(
ChannelInterface* channel, ClientContext* context,
const grpc::string& method, CompletionQueue* cq, bool start, void* tag) {
return std::unique_ptr<GenericClientAsyncReaderWriter>(
GenericClientAsyncReaderWriter::Create(
channel, cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING),
context, start, tag));
}
} // namespace
// begin a call to a named method // begin a call to a named method
std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call(
ClientContext* context, const grpc::string& method, CompletionQueue* cq, ClientContext* context, const grpc::string& method, CompletionQueue* cq,
void* tag) { void* tag) {
return std::unique_ptr<GenericClientAsyncReaderWriter>( return CallInternal(channel_.get(), context, method, cq, true, tag);
GenericClientAsyncReaderWriter::Create( }
channel_.get(), cq,
RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); // setup a call to a named method
std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::PrepareCall(
ClientContext* context, const grpc::string& method, CompletionQueue* cq) {
return CallInternal(channel_.get(), context, method, cq, false, nullptr);
} }
} // namespace grpc } // namespace grpc

@ -65,6 +65,9 @@ class ServiceA final {
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> AsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> AsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(AsyncMethodA1Raw(context, request, cq)); return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(AsyncMethodA1Raw(context, request, cq));
} }
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> PrepareAsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodA1Raw(context, request, cq));
}
// MethodA1 trailing comment 1 // MethodA1 trailing comment 1
// MethodA2 detached leading comment 1 // MethodA2 detached leading comment 1
// //
@ -76,6 +79,9 @@ class ServiceA final {
std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>> AsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) { std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>> AsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>>(AsyncMethodA2Raw(context, response, cq, tag)); return std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>>(AsyncMethodA2Raw(context, response, cq, tag));
} }
std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>> PrepareAsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>>(PrepareAsyncMethodA2Raw(context, response, cq));
}
// MethodA2 trailing comment 1 // MethodA2 trailing comment 1
// Method A3 leading comment 1 // Method A3 leading comment 1
std::unique_ptr< ::grpc::ClientReaderInterface< ::grpc::testing::Response>> MethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request) { std::unique_ptr< ::grpc::ClientReaderInterface< ::grpc::testing::Response>> MethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request) {
@ -84,6 +90,9 @@ class ServiceA final {
std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>> AsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) { std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>> AsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>>(AsyncMethodA3Raw(context, request, cq, tag)); return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>>(AsyncMethodA3Raw(context, request, cq, tag));
} }
std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>> PrepareAsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodA3Raw(context, request, cq));
}
// Method A3 trailing comment 1 // Method A3 trailing comment 1
// Method A4 leading comment 1 // Method A4 leading comment 1
std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> MethodA4(::grpc::ClientContext* context) { std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> MethodA4(::grpc::ClientContext* context) {
@ -92,15 +101,22 @@ class ServiceA final {
std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> AsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) { std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> AsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(AsyncMethodA4Raw(context, cq, tag)); return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(AsyncMethodA4Raw(context, cq, tag));
} }
std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> PrepareAsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq));
}
// Method A4 trailing comment 1 // Method A4 trailing comment 1
private: private:
virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientWriterInterface< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) = 0; virtual ::grpc::ClientWriterInterface< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) = 0;
virtual ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>* AsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) = 0; virtual ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>* AsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) = 0;
virtual ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>* PrepareAsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientReaderInterface< ::grpc::testing::Response>* MethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request) = 0; virtual ::grpc::ClientReaderInterface< ::grpc::testing::Response>* MethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request) = 0;
virtual ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) = 0; virtual ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) = 0;
virtual ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) = 0; virtual ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) = 0;
virtual ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) = 0; virtual ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) = 0;
virtual ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) = 0;
}; };
class Stub final : public StubInterface { class Stub final : public StubInterface {
public: public:
@ -109,34 +125,50 @@ class ServiceA final {
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> AsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> AsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(AsyncMethodA1Raw(context, request, cq)); return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(AsyncMethodA1Raw(context, request, cq));
} }
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> PrepareAsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(PrepareAsyncMethodA1Raw(context, request, cq));
}
std::unique_ptr< ::grpc::ClientWriter< ::grpc::testing::Request>> MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response) { std::unique_ptr< ::grpc::ClientWriter< ::grpc::testing::Request>> MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response) {
return std::unique_ptr< ::grpc::ClientWriter< ::grpc::testing::Request>>(MethodA2Raw(context, response)); return std::unique_ptr< ::grpc::ClientWriter< ::grpc::testing::Request>>(MethodA2Raw(context, response));
} }
std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>> AsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) { std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>> AsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>>(AsyncMethodA2Raw(context, response, cq, tag)); return std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>>(AsyncMethodA2Raw(context, response, cq, tag));
} }
std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>> PrepareAsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>>(PrepareAsyncMethodA2Raw(context, response, cq));
}
std::unique_ptr< ::grpc::ClientReader< ::grpc::testing::Response>> MethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request) { std::unique_ptr< ::grpc::ClientReader< ::grpc::testing::Response>> MethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request) {
return std::unique_ptr< ::grpc::ClientReader< ::grpc::testing::Response>>(MethodA3Raw(context, request)); return std::unique_ptr< ::grpc::ClientReader< ::grpc::testing::Response>>(MethodA3Raw(context, request));
} }
std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>> AsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) { std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>> AsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>>(AsyncMethodA3Raw(context, request, cq, tag)); return std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>>(AsyncMethodA3Raw(context, request, cq, tag));
} }
std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>> PrepareAsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>>(PrepareAsyncMethodA3Raw(context, request, cq));
}
std::unique_ptr< ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> MethodA4(::grpc::ClientContext* context) { std::unique_ptr< ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> MethodA4(::grpc::ClientContext* context) {
return std::unique_ptr< ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(MethodA4Raw(context)); return std::unique_ptr< ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(MethodA4Raw(context));
} }
std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> AsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) { std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> AsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(AsyncMethodA4Raw(context, cq, tag)); return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(AsyncMethodA4Raw(context, cq, tag));
} }
std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> PrepareAsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq));
}
private: private:
std::shared_ptr< ::grpc::ChannelInterface> channel_; std::shared_ptr< ::grpc::ChannelInterface> channel_;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientWriter< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) override; ::grpc::ClientWriter< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) override;
::grpc::ClientAsyncWriter< ::grpc::testing::Request>* AsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) override; ::grpc::ClientAsyncWriter< ::grpc::testing::Request>* AsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) override;
::grpc::ClientAsyncWriter< ::grpc::testing::Request>* PrepareAsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq) override;
::grpc::ClientReader< ::grpc::testing::Response>* MethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request) override; ::grpc::ClientReader< ::grpc::testing::Response>* MethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request) override;
::grpc::ClientAsyncReader< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) override; ::grpc::ClientAsyncReader< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) override;
::grpc::ClientAsyncReader< ::grpc::testing::Response>* PrepareAsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override; ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override;
::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override;
::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override;
const ::grpc::RpcMethod rpcmethod_MethodA1_; const ::grpc::RpcMethod rpcmethod_MethodA1_;
const ::grpc::RpcMethod rpcmethod_MethodA2_; const ::grpc::RpcMethod rpcmethod_MethodA2_;
const ::grpc::RpcMethod rpcmethod_MethodA3_; const ::grpc::RpcMethod rpcmethod_MethodA3_;
@ -372,9 +404,13 @@ class ServiceB final {
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> AsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> AsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(AsyncMethodB1Raw(context, request, cq)); return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(AsyncMethodB1Raw(context, request, cq));
} }
std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>> PrepareAsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq));
}
// MethodB1 trailing comment 1 // MethodB1 trailing comment 1
private: private:
virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0;
}; };
class Stub final : public StubInterface { class Stub final : public StubInterface {
public: public:
@ -383,10 +419,14 @@ class ServiceB final {
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> AsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> AsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(AsyncMethodB1Raw(context, request, cq)); return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(AsyncMethodB1Raw(context, request, cq));
} }
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> PrepareAsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq));
}
private: private:
std::shared_ptr< ::grpc::ChannelInterface> channel_; std::shared_ptr< ::grpc::ChannelInterface> channel_;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
const ::grpc::RpcMethod rpcmethod_MethodB1_; const ::grpc::RpcMethod rpcmethod_MethodB1_;
}; };
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());

@ -15,18 +15,23 @@ class MockServiceAStub : public ServiceA::StubInterface {
public: public:
MOCK_METHOD3(MethodA1, ::grpc::Status(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response)); MOCK_METHOD3(MethodA1, ::grpc::Status(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response));
MOCK_METHOD3(AsyncMethodA1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); MOCK_METHOD3(AsyncMethodA1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq));
MOCK_METHOD3(PrepareAsyncMethodA1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq));
MOCK_METHOD2(MethodA2Raw, ::grpc::ClientWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response)); MOCK_METHOD2(MethodA2Raw, ::grpc::ClientWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response));
MOCK_METHOD4(AsyncMethodA2Raw, ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag)); MOCK_METHOD4(AsyncMethodA2Raw, ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag));
MOCK_METHOD3(PrepareAsyncMethodA2Raw, ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq));
MOCK_METHOD2(MethodA3Raw, ::grpc::ClientReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request)); MOCK_METHOD2(MethodA3Raw, ::grpc::ClientReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request));
MOCK_METHOD4(AsyncMethodA3Raw, ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag)); MOCK_METHOD4(AsyncMethodA3Raw, ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag));
MOCK_METHOD3(PrepareAsyncMethodA3Raw, ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq));
MOCK_METHOD1(MethodA4Raw, ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context)); MOCK_METHOD1(MethodA4Raw, ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context));
MOCK_METHOD3(AsyncMethodA4Raw, ::grpc::ClientAsyncReaderWriterInterface<::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag)); MOCK_METHOD3(AsyncMethodA4Raw, ::grpc::ClientAsyncReaderWriterInterface<::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag));
MOCK_METHOD2(PrepareAsyncMethodA4Raw, ::grpc::ClientAsyncReaderWriterInterface<::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq));
}; };
class MockServiceBStub : public ServiceB::StubInterface { class MockServiceBStub : public ServiceB::StubInterface {
public: public:
MOCK_METHOD3(MethodB1, ::grpc::Status(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response)); MOCK_METHOD3(MethodB1, ::grpc::Status(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response));
MOCK_METHOD3(AsyncMethodB1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq)); MOCK_METHOD3(AsyncMethodB1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq));
MOCK_METHOD3(PrepareAsyncMethodB1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq));
}; };
} // namespace grpc } // namespace grpc

@ -56,11 +56,6 @@ class ClientRpcContext {
} }
virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0; virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
void lock() { mu_.lock(); }
void unlock() { mu_.unlock(); }
private:
std::mutex mu_;
}; };
template <class RequestType, class ResponseType> template <class RequestType, class ResponseType>
@ -73,7 +68,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> CompletionQueue*)>
start_req, prepare_req,
std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done) std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done)
: context_(), : context_(),
stub_(stub), stub_(stub),
@ -83,7 +78,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
next_state_(State::READY), next_state_(State::READY),
callback_(on_done), callback_(on_done),
next_issue_(next_issue), next_issue_(next_issue),
start_req_(start_req) {} prepare_req_(prepare_req) {}
~ClientRpcContextUnaryImpl() override {} ~ClientRpcContextUnaryImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override { void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq); StartInternal(cq);
@ -92,7 +87,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
switch (next_state_) { switch (next_state_) {
case State::READY: case State::READY:
start_ = UsageTimer::Now(); start_ = UsageTimer::Now();
response_reader_ = start_req_(stub_, &context_, req_, cq_); response_reader_ = prepare_req_(stub_, &context_, req_, cq_);
response_reader_->StartCall();
next_state_ = State::RESP_DONE; next_state_ = State::RESP_DONE;
response_reader_->Finish(&response_, &status_, response_reader_->Finish(&response_, &status_,
ClientRpcContext::tag(this)); ClientRpcContext::tag(this));
@ -111,8 +107,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
} }
void StartNewClone(CompletionQueue* cq) override { void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
start_req_, callback_); prepare_req_, callback_);
std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq); clone->StartInternal(cq);
} }
@ -130,7 +125,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> CompletionQueue*)>
start_req_; prepare_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
@ -252,29 +247,13 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
// this thread isn't supposed to shut down // this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) { if (shutdown_state_[thread_idx]->shutdown) {
// We want to delete the context. However, it is possible that
// another thread that just initiated an action on this
// context still has its lock even though the action on the
// context has completed. To delay for that, just grab the
// lock for serialization. Take a new scope.
{ std::lock_guard<ClientRpcContext> lctx(*ctx); }
delete ctx; delete ctx;
return true; return true;
} }
bool del = false;
// Create a new scope for a lock_guard'ed region
{
std::lock_guard<ClientRpcContext> lctx(*ctx);
if (!ctx->RunNextState(ok, entry)) { if (!ctx->RunNextState(ok, entry)) {
// The RPC and callback are done, so clone the ctx // The RPC and callback are done, so clone the ctx
// and kickstart the new one // and kickstart the new one
ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
// set the old version to delete
del = true;
}
}
if (del) {
delete ctx; delete ctx;
} }
return true; return true;
@ -311,15 +290,15 @@ class AsyncUnaryClient final
entry->set_status(s.error_code()); entry->set_status(s.error_code());
} }
static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>> static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) { const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq); return stub->PrepareAsyncUnaryCall(ctx, request, cq);
}; };
static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
std::function<gpr_timespec()> next_issue, std::function<gpr_timespec()> next_issue,
const SimpleRequest& req) { const SimpleRequest& req) {
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
stub, req, next_issue, AsyncUnaryClient::StartReq, stub, req, next_issue, AsyncUnaryClient::PrepareReq,
AsyncUnaryClient::CheckDone); AsyncUnaryClient::CheckDone);
} }
}; };
@ -332,9 +311,8 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue, std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr< std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
void*)> prepare_req,
start_req,
std::function<void(grpc::Status, ResponseType*)> on_done) std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(), : context_(),
stub_(stub), stub_(stub),
@ -344,7 +322,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
next_state_(State::INVALID), next_state_(State::INVALID),
callback_(on_done), callback_(on_done),
next_issue_(next_issue), next_issue_(next_issue),
start_req_(start_req) {} prepare_req_(prepare_req) {}
~ClientRpcContextStreamingPingPongImpl() override {} ~ClientRpcContextStreamingPingPongImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override { void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq, config.messages_per_stream()); StartInternal(cq, config.messages_per_stream());
@ -407,8 +385,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
} }
void StartNewClone(CompletionQueue* cq) override { void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingPingPongImpl( auto* clone = new ClientRpcContextStreamingPingPongImpl(
stub_, req_, next_issue_, start_req_, callback_); stub_, req_, next_issue_, prepare_req_, callback_);
std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq, messages_per_stream_); clone->StartInternal(cq, messages_per_stream_);
} }
@ -432,10 +409,10 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
State next_state_; State next_state_;
std::function<void(grpc::Status, ResponseType*)> callback_; std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<gpr_timespec()> next_issue_; std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr< std::function<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
start_req_; prepare_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
@ -449,8 +426,9 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
cq_ = cq; cq_ = cq;
messages_per_stream_ = messages_per_stream; messages_per_stream_ = messages_per_stream;
messages_issued_ = 0; messages_issued_ = 0;
stream_ = prepare_req_(stub_, &context_, cq);
next_state_ = State::STREAM_IDLE; next_state_ = State::STREAM_IDLE;
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); stream_->StartCall(ClientRpcContext::tag(this));
} }
}; };
@ -469,9 +447,9 @@ class AsyncStreamingPingPongClient final
static void CheckDone(grpc::Status s, SimpleResponse* response) {} static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr< static std::unique_ptr<
grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>> grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
CompletionQueue* cq, void* tag) { CompletionQueue* cq) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag); auto stream = stub->PrepareAsyncStreamingCall(ctx, cq);
return stream; return stream;
}; };
static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
@ -479,7 +457,7 @@ class AsyncStreamingPingPongClient final
const SimpleRequest& req) { const SimpleRequest& req) {
return new ClientRpcContextStreamingPingPongImpl<SimpleRequest, return new ClientRpcContextStreamingPingPongImpl<SimpleRequest,
SimpleResponse>( SimpleResponse>(
stub, req, next_issue, AsyncStreamingPingPongClient::StartReq, stub, req, next_issue, AsyncStreamingPingPongClient::PrepareReq,
AsyncStreamingPingPongClient::CheckDone); AsyncStreamingPingPongClient::CheckDone);
} }
}; };
@ -492,8 +470,8 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue, std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>( std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*, BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
CompletionQueue*, void*)> CompletionQueue*)>
start_req, prepare_req,
std::function<void(grpc::Status, ResponseType*)> on_done) std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(), : context_(),
stub_(stub), stub_(stub),
@ -503,7 +481,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
next_state_(State::INVALID), next_state_(State::INVALID),
callback_(on_done), callback_(on_done),
next_issue_(next_issue), next_issue_(next_issue),
start_req_(start_req) {} prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromClientImpl() override {} ~ClientRpcContextStreamingFromClientImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override { void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq); StartInternal(cq);
@ -546,8 +524,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
} }
void StartNewClone(CompletionQueue* cq) override { void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromClientImpl( auto* clone = new ClientRpcContextStreamingFromClientImpl(
stub_, req_, next_issue_, start_req_, callback_); stub_, req_, next_issue_, prepare_req_, callback_);
std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq); clone->StartInternal(cq);
} }
@ -570,17 +547,17 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue_; std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>( std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*, BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
CompletionQueue*, void*)> CompletionQueue*)>
start_req_; prepare_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_; std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_;
void StartInternal(CompletionQueue* cq) { void StartInternal(CompletionQueue* cq) {
cq_ = cq; cq_ = cq;
stream_ = start_req_(stub_, &context_, &response_, cq, stream_ = prepare_req_(stub_, &context_, &response_, cq);
ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE; next_state_ = State::STREAM_IDLE;
stream_->StartCall(ClientRpcContext::tag(this));
} }
}; };
@ -597,10 +574,10 @@ class AsyncStreamingFromClientClient final
private: private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {} static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> StartReq( static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> PrepareReq(
BenchmarkService::Stub* stub, grpc::ClientContext* ctx, BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
SimpleResponse* resp, CompletionQueue* cq, void* tag) { SimpleResponse* resp, CompletionQueue* cq) {
auto stream = stub->AsyncStreamingFromClient(ctx, resp, cq, tag); auto stream = stub->PrepareAsyncStreamingFromClient(ctx, resp, cq);
return stream; return stream;
}; };
static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
@ -608,7 +585,7 @@ class AsyncStreamingFromClientClient final
const SimpleRequest& req) { const SimpleRequest& req) {
return new ClientRpcContextStreamingFromClientImpl<SimpleRequest, return new ClientRpcContextStreamingFromClientImpl<SimpleRequest,
SimpleResponse>( SimpleResponse>(
stub, req, next_issue, AsyncStreamingFromClientClient::StartReq, stub, req, next_issue, AsyncStreamingFromClientClient::PrepareReq,
AsyncStreamingFromClientClient::CheckDone); AsyncStreamingFromClientClient::CheckDone);
} }
}; };
@ -621,8 +598,8 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue, std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>( std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*, void*)> CompletionQueue*)>
start_req, prepare_req,
std::function<void(grpc::Status, ResponseType*)> on_done) std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(), : context_(),
stub_(stub), stub_(stub),
@ -632,7 +609,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
next_state_(State::INVALID), next_state_(State::INVALID),
callback_(on_done), callback_(on_done),
next_issue_(next_issue), next_issue_(next_issue),
start_req_(start_req) {} prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromServerImpl() override {} ~ClientRpcContextStreamingFromServerImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override { void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq); StartInternal(cq);
@ -664,8 +641,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
} }
void StartNewClone(CompletionQueue* cq) override { void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromServerImpl( auto* clone = new ClientRpcContextStreamingFromServerImpl(
stub_, req_, next_issue_, start_req_, callback_); stub_, req_, next_issue_, prepare_req_, callback_);
std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq); clone->StartInternal(cq);
} }
@ -682,8 +658,8 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue_; std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>( std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*, void*)> CompletionQueue*)>
start_req_; prepare_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_; std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_;
@ -691,9 +667,9 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
void StartInternal(CompletionQueue* cq) { void StartInternal(CompletionQueue* cq) {
// TODO(vjpai): Add support to rate-pace this // TODO(vjpai): Add support to rate-pace this
cq_ = cq; cq_ = cq;
stream_ = prepare_req_(stub_, &context_, req_, cq);
next_state_ = State::STREAM_IDLE; next_state_ = State::STREAM_IDLE;
stream_ = stream_->StartCall(ClientRpcContext::tag(this));
start_req_(stub_, &context_, req_, cq, ClientRpcContext::tag(this));
} }
}; };
@ -710,10 +686,10 @@ class AsyncStreamingFromServerClient final
private: private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {} static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> StartReq( static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> PrepareReq(
BenchmarkService::Stub* stub, grpc::ClientContext* ctx, BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& req, CompletionQueue* cq, void* tag) { const SimpleRequest& req, CompletionQueue* cq) {
auto stream = stub->AsyncStreamingFromServer(ctx, req, cq, tag); auto stream = stub->PrepareAsyncStreamingFromServer(ctx, req, cq);
return stream; return stream;
}; };
static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
@ -721,7 +697,7 @@ class AsyncStreamingFromServerClient final
const SimpleRequest& req) { const SimpleRequest& req) {
return new ClientRpcContextStreamingFromServerImpl<SimpleRequest, return new ClientRpcContextStreamingFromServerImpl<SimpleRequest,
SimpleResponse>( SimpleResponse>(
stub, req, next_issue, AsyncStreamingFromServerClient::StartReq, stub, req, next_issue, AsyncStreamingFromServerClient::PrepareReq,
AsyncStreamingFromServerClient::CheckDone); AsyncStreamingFromServerClient::CheckDone);
} }
}; };
@ -733,8 +709,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue, std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, grpc::GenericStub*, grpc::ClientContext*,
const grpc::string& method_name, CompletionQueue*, void*)> const grpc::string& method_name, CompletionQueue*)>
start_req, prepare_req,
std::function<void(grpc::Status, ByteBuffer*)> on_done) std::function<void(grpc::Status, ByteBuffer*)> on_done)
: context_(), : context_(),
stub_(stub), stub_(stub),
@ -744,7 +720,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
next_state_(State::INVALID), next_state_(State::INVALID),
callback_(on_done), callback_(on_done),
next_issue_(next_issue), next_issue_(next_issue),
start_req_(start_req) {} prepare_req_(prepare_req) {}
~ClientRpcContextGenericStreamingImpl() override {} ~ClientRpcContextGenericStreamingImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override { void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq, config.messages_per_stream()); StartInternal(cq, config.messages_per_stream());
@ -807,8 +783,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
} }
void StartNewClone(CompletionQueue* cq) override { void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextGenericStreamingImpl( auto* clone = new ClientRpcContextGenericStreamingImpl(
stub_, req_, next_issue_, start_req_, callback_); stub_, req_, next_issue_, prepare_req_, callback_);
std::lock_guard<ClientRpcContext> lclone(*clone);
clone->StartInternal(cq, messages_per_stream_); clone->StartInternal(cq, messages_per_stream_);
} }
@ -834,8 +809,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue_; std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
CompletionQueue*, void*)> CompletionQueue*)>
start_req_; prepare_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_; std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
@ -850,9 +825,9 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
"/grpc.testing.BenchmarkService/StreamingCall"); "/grpc.testing.BenchmarkService/StreamingCall");
messages_per_stream_ = messages_per_stream; messages_per_stream_ = messages_per_stream;
messages_issued_ = 0; messages_issued_ = 0;
stream_ = prepare_req_(stub_, &context_, kMethodName, cq);
next_state_ = State::STREAM_IDLE; next_state_ = State::STREAM_IDLE;
stream_ = start_req_(stub_, &context_, kMethodName, cq, stream_->StartCall(ClientRpcContext::tag(this));
ClientRpcContext::tag(this));
} }
}; };
@ -874,17 +849,17 @@ class GenericAsyncStreamingClient final
private: private:
static void CheckDone(grpc::Status s, ByteBuffer* response) {} static void CheckDone(grpc::Status s, ByteBuffer* response) {}
static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq( static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> PrepareReq(
grpc::GenericStub* stub, grpc::ClientContext* ctx, grpc::GenericStub* stub, grpc::ClientContext* ctx,
const grpc::string& method_name, CompletionQueue* cq, void* tag) { const grpc::string& method_name, CompletionQueue* cq) {
auto stream = stub->Call(ctx, method_name, cq, tag); auto stream = stub->PrepareCall(ctx, method_name, cq);
return stream; return stream;
}; };
static ClientRpcContext* SetupCtx(grpc::GenericStub* stub, static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
std::function<gpr_timespec()> next_issue, std::function<gpr_timespec()> next_issue,
const ByteBuffer& req) { const ByteBuffer& req) {
return new ClientRpcContextGenericStreamingImpl( return new ClientRpcContextGenericStreamingImpl(
stub, req, next_issue, GenericAsyncStreamingClient::StartReq, stub, req, next_issue, GenericAsyncStreamingClient::PrepareReq,
GenericAsyncStreamingClient::CheckDone); GenericAsyncStreamingClient::CheckDone);
} }
}; };

Loading…
Cancel
Save