Stop exposing streaming object class

pull/17104/head
Vijay Pai 6 years ago
parent dac2066a1c
commit ea1156da3f
  1. 6
      include/grpcpp/generic/generic_stub.h
  2. 302
      include/grpcpp/impl/codegen/client_callback.h
  3. 96
      src/compiler/cpp_generator.cc
  4. 15
      src/cpp/client/generic_stub.cc
  5. 1105
      test/cpp/codegen/compiler_test_golden
  6. 63
      test/cpp/end2end/client_callback_end2end_test.cc

@ -77,9 +77,9 @@ class GenericStub final {
const ByteBuffer* request, ByteBuffer* response, const ByteBuffer* request, ByteBuffer* response,
std::function<void(Status)> on_completion); std::function<void(Status)> on_completion);
experimental::ClientCallbackReaderWriter<ByteBuffer, ByteBuffer>* void PrepareBidiStreamingCall(
PrepareBidiStreamingCall(ClientContext* context, const grpc::string& method, ClientContext* context, const grpc::string& method,
experimental::ClientBidiReactor* reactor); experimental::ClientBidiReactor<ByteBuffer, ByteBuffer>* reactor);
private: private:
GenericStub* stub_; GenericStub* stub_;

@ -93,9 +93,67 @@ class CallbackUnaryCallImpl {
namespace experimental { namespace experimental {
// Forward declarations
template <class Request, class Response>
class ClientBidiReactor;
template <class Response>
class ClientReadReactor;
template <class Request>
class ClientWriteReactor;
// NOTE: The streaming objects are not actually implemented in the public API.
// These interfaces are provided for mocking only. Typical applications
// will interact exclusively with the reactors that they define.
template <class Request, class Response>
class ClientCallbackReaderWriter {
public:
virtual ~ClientCallbackReaderWriter() {}
virtual void StartCall() = 0;
virtual void Write(const Request* req, WriteOptions options) = 0;
virtual void WritesDone() = 0;
virtual void Read(Response* resp) = 0;
protected:
void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
reactor->BindStream(this);
}
};
template <class Response>
class ClientCallbackReader {
public:
virtual ~ClientCallbackReader() {}
virtual void StartCall() = 0;
virtual void Read(Response* resp) = 0;
protected:
void BindReactor(ClientReadReactor<Response>* reactor) {
reactor->BindReader(this);
}
};
template <class Request>
class ClientCallbackWriter {
public:
virtual ~ClientCallbackWriter() {}
virtual void StartCall() = 0;
void Write(const Request* req) { Write(req, WriteOptions()); }
virtual void Write(const Request* req, WriteOptions options) = 0;
void WriteLast(const Request* req, WriteOptions options) {
Write(req, options.set_last_message());
}
virtual void WritesDone() = 0;
protected:
void BindReactor(ClientWriteReactor<Request>* reactor) {
reactor->BindWriter(this);
}
};
// The user must implement this reactor interface with reactions to each event // The user must implement this reactor interface with reactions to each event
// type that gets called by the library. An empty reaction is provided by // type that gets called by the library. An empty reaction is provided by
// default // default
template <class Request, class Response>
class ClientBidiReactor { class ClientBidiReactor {
public: public:
virtual ~ClientBidiReactor() {} virtual ~ClientBidiReactor() {}
@ -104,16 +162,44 @@ class ClientBidiReactor {
virtual void OnReadDone(bool ok) {} virtual void OnReadDone(bool ok) {}
virtual void OnWriteDone(bool ok) {} virtual void OnWriteDone(bool ok) {}
virtual void OnWritesDoneDone(bool ok) {} virtual void OnWritesDoneDone(bool ok) {}
void StartCall() { stream_->StartCall(); }
void StartRead(Response* resp) { stream_->Read(resp); }
void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
void StartWrite(const Request* req, WriteOptions options) {
stream_->Write(req, std::move(options));
}
void StartWriteLast(const Request* req, WriteOptions options) {
StartWrite(req, std::move(options.set_last_message()));
}
void StartWritesDone() { stream_->WritesDone(); }
private:
friend class ClientCallbackReaderWriter<Request, Response>;
void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
stream_ = stream;
}
ClientCallbackReaderWriter<Request, Response>* stream_;
}; };
template <class Response>
class ClientReadReactor { class ClientReadReactor {
public: public:
virtual ~ClientReadReactor() {} virtual ~ClientReadReactor() {}
virtual void OnDone(Status s) {} virtual void OnDone(Status s) {}
virtual void OnReadInitialMetadataDone(bool ok) {} virtual void OnReadInitialMetadataDone(bool ok) {}
virtual void OnReadDone(bool ok) {} virtual void OnReadDone(bool ok) {}
void StartCall() { reader_->StartCall(); }
void StartRead(Response* resp) { reader_->Read(resp); }
private:
friend class ClientCallbackReader<Response>;
void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
ClientCallbackReader<Response>* reader_;
}; };
template <class Request>
class ClientWriteReactor { class ClientWriteReactor {
public: public:
virtual ~ClientWriteReactor() {} virtual ~ClientWriteReactor() {}
@ -121,41 +207,21 @@ class ClientWriteReactor {
virtual void OnReadInitialMetadataDone(bool ok) {} virtual void OnReadInitialMetadataDone(bool ok) {}
virtual void OnWriteDone(bool ok) {} virtual void OnWriteDone(bool ok) {}
virtual void OnWritesDoneDone(bool ok) {} virtual void OnWritesDoneDone(bool ok) {}
};
template <class Request, class Response> void StartCall() { writer_->StartCall(); }
class ClientCallbackReaderWriter { void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
public: void StartWrite(const Request* req, WriteOptions options) {
virtual ~ClientCallbackReaderWriter() {} writer_->Write(req, std::move(options));
virtual void StartCall() = 0;
void Write(const Request* req) { Write(req, WriteOptions()); }
virtual void Write(const Request* req, WriteOptions options) = 0;
void WriteLast(const Request* req, WriteOptions options) {
Write(req, options.set_last_message());
} }
virtual void WritesDone() = 0; void StartWriteLast(const Request* req, WriteOptions options) {
virtual void Read(Response* resp) = 0; StartWrite(req, std::move(options.set_last_message()));
};
template <class Response>
class ClientCallbackReader {
public:
virtual ~ClientCallbackReader() {}
virtual void StartCall() = 0;
virtual void Read(Response* resp) = 0;
};
template <class Request>
class ClientCallbackWriter {
public:
virtual ~ClientCallbackWriter() {}
virtual void StartCall() = 0;
void Write(const Request* req) { Write(req, WriteOptions()); }
virtual void Write(const Request* req, WriteOptions options) = 0;
void WriteLast(const Request* req, WriteOptions options) {
Write(req, options.set_last_message());
} }
virtual void WritesDone() = 0; void StartWritesDone() { writer_->WritesDone(); }
private:
friend class ClientCallbackWriter<Request>;
void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
ClientCallbackWriter<Request>* writer_;
}; };
} // namespace experimental } // namespace experimental
@ -204,12 +270,13 @@ class ClientCallbackReaderWriterImpl
// 4. Any write backlog // 4. Any write backlog
started_ = true; started_ = true;
start_tag_.Set(call_.call(), start_tag_.Set(
[this](bool ok) { call_.call(),
reactor_->OnReadInitialMetadataDone(ok); [this](bool ok) {
MaybeFinish(); reactor_->OnReadInitialMetadataDone(ok);
}, MaybeFinish();
&start_ops_); },
&start_ops_);
if (!start_corked_) { if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags()); context_->initial_metadata_flags());
@ -220,27 +287,29 @@ class ClientCallbackReaderWriterImpl
// Also set up the read and write tags so that they don't have to be set up // Also set up the read and write tags so that they don't have to be set up
// each time // each time
write_tag_.Set(call_.call(), write_tag_.Set(
[this](bool ok) { call_.call(),
reactor_->OnWriteDone(ok); [this](bool ok) {
MaybeFinish(); reactor_->OnWriteDone(ok);
}, MaybeFinish();
&write_ops_); },
&write_ops_);
write_ops_.set_core_cq_tag(&write_tag_); write_ops_.set_core_cq_tag(&write_tag_);
read_tag_.Set(call_.call(), read_tag_.Set(
[this](bool ok) { call_.call(),
reactor_->OnReadDone(ok); [this](bool ok) {
MaybeFinish(); reactor_->OnReadDone(ok);
}, MaybeFinish();
&read_ops_); },
&read_ops_);
read_ops_.set_core_cq_tag(&read_tag_); read_ops_.set_core_cq_tag(&read_tag_);
if (read_ops_at_start_) { if (read_ops_at_start_) {
call_.PerformOps(&read_ops_); call_.PerformOps(&read_ops_);
} }
finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, finish_tag_.Set(
&finish_ops_); call_.call(), [this](bool ok) { MaybeFinish(); }, &finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_); finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_); finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_); call_.PerformOps(&finish_ops_);
@ -291,12 +360,13 @@ class ClientCallbackReaderWriterImpl
start_corked_ = false; start_corked_ = false;
} }
writes_done_ops_.ClientSendClose(); writes_done_ops_.ClientSendClose();
writes_done_tag_.Set(call_.call(), writes_done_tag_.Set(
[this](bool ok) { call_.call(),
reactor_->OnWritesDoneDone(ok); [this](bool ok) {
MaybeFinish(); reactor_->OnWritesDoneDone(ok);
}, MaybeFinish();
&writes_done_ops_); },
&writes_done_ops_);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_); writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_++; callbacks_outstanding_++;
if (started_) { if (started_) {
@ -311,15 +381,17 @@ class ClientCallbackReaderWriterImpl
ClientCallbackReaderWriterImpl( ClientCallbackReaderWriterImpl(
Call call, ClientContext* context, Call call, ClientContext* context,
::grpc::experimental::ClientBidiReactor* reactor) ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor)
: context_(context), : context_(context),
call_(call), call_(call),
reactor_(reactor), reactor_(reactor),
start_corked_(context_->initial_metadata_corked_) {} start_corked_(context_->initial_metadata_corked_) {
this->BindReactor(reactor);
}
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
::grpc::experimental::ClientBidiReactor* reactor_; ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor_;
CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_;
CallbackWithSuccessTag start_tag_; CallbackWithSuccessTag start_tag_;
@ -350,14 +422,14 @@ class ClientCallbackReaderWriterImpl
template <class Request, class Response> template <class Request, class Response>
class ClientCallbackReaderWriterFactory { class ClientCallbackReaderWriterFactory {
public: public:
static experimental::ClientCallbackReaderWriter<Request, Response>* Create( static void Create(
ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
ClientContext* context, ClientContext* context,
::grpc::experimental::ClientBidiReactor* reactor) { ::grpc::experimental::ClientBidiReactor<Request, Response>* reactor) {
Call call = channel->CreateCall(method, context, channel->CallbackCQ()); Call call = channel->CreateCall(method, context, channel->CallbackCQ());
g_core_codegen_interface->grpc_call_ref(call.call()); g_core_codegen_interface->grpc_call_ref(call.call());
return new (g_core_codegen_interface->grpc_call_arena_alloc( new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>))) call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
ClientCallbackReaderWriterImpl<Request, Response>(call, context, ClientCallbackReaderWriterImpl<Request, Response>(call, context,
reactor); reactor);
@ -396,12 +468,13 @@ class ClientCallbackReaderImpl
// 3. Recv trailing metadata, on_completion callback // 3. Recv trailing metadata, on_completion callback
started_ = true; started_ = true;
start_tag_.Set(call_.call(), start_tag_.Set(
[this](bool ok) { call_.call(),
reactor_->OnReadInitialMetadataDone(ok); [this](bool ok) {
MaybeFinish(); reactor_->OnReadInitialMetadataDone(ok);
}, MaybeFinish();
&start_ops_); },
&start_ops_);
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags()); context_->initial_metadata_flags());
start_ops_.RecvInitialMetadata(context_); start_ops_.RecvInitialMetadata(context_);
@ -409,19 +482,20 @@ class ClientCallbackReaderImpl
call_.PerformOps(&start_ops_); call_.PerformOps(&start_ops_);
// Also set up the read tag so it doesn't have to be set up each time // Also set up the read tag so it doesn't have to be set up each time
read_tag_.Set(call_.call(), read_tag_.Set(
[this](bool ok) { call_.call(),
reactor_->OnReadDone(ok); [this](bool ok) {
MaybeFinish(); reactor_->OnReadDone(ok);
}, MaybeFinish();
&read_ops_); },
&read_ops_);
read_ops_.set_core_cq_tag(&read_tag_); read_ops_.set_core_cq_tag(&read_tag_);
if (read_ops_at_start_) { if (read_ops_at_start_) {
call_.PerformOps(&read_ops_); call_.PerformOps(&read_ops_);
} }
finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, finish_tag_.Set(
&finish_ops_); call_.call(), [this](bool ok) { MaybeFinish(); }, &finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_); finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_); finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_); call_.PerformOps(&finish_ops_);
@ -441,9 +515,11 @@ class ClientCallbackReaderImpl
friend class ClientCallbackReaderFactory<Response>; friend class ClientCallbackReaderFactory<Response>;
template <class Request> template <class Request>
ClientCallbackReaderImpl(Call call, ClientContext* context, Request* request, ClientCallbackReaderImpl(
::grpc::experimental::ClientReadReactor* reactor) Call call, ClientContext* context, Request* request,
::grpc::experimental::ClientReadReactor<Response>* reactor)
: context_(context), call_(call), reactor_(reactor) { : context_(context), call_(call), reactor_(reactor) {
this->BindReactor(reactor);
// TODO(vjpai): don't assert // TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(start_ops_.SendMessage(*request).ok()); GPR_CODEGEN_ASSERT(start_ops_.SendMessage(*request).ok());
start_ops_.ClientSendClose(); start_ops_.ClientSendClose();
@ -451,7 +527,7 @@ class ClientCallbackReaderImpl
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
::grpc::experimental::ClientReadReactor* reactor_; ::grpc::experimental::ClientReadReactor<Response>* reactor_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose, CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose,
CallOpRecvInitialMetadata> CallOpRecvInitialMetadata>
@ -475,14 +551,14 @@ template <class Response>
class ClientCallbackReaderFactory { class ClientCallbackReaderFactory {
public: public:
template <class Request> template <class Request>
static experimental::ClientCallbackReader<Response>* Create( static void Create(
ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
ClientContext* context, const Request* request, ClientContext* context, const Request* request,
::grpc::experimental::ClientReadReactor* reactor) { ::grpc::experimental::ClientReadReactor<Response>* reactor) {
Call call = channel->CreateCall(method, context, channel->CallbackCQ()); Call call = channel->CreateCall(method, context, channel->CallbackCQ());
g_core_codegen_interface->grpc_call_ref(call.call()); g_core_codegen_interface->grpc_call_ref(call.call());
return new (g_core_codegen_interface->grpc_call_arena_alloc( new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackReaderImpl<Response>))) call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
ClientCallbackReaderImpl<Response>(call, context, request, reactor); ClientCallbackReaderImpl<Response>(call, context, request, reactor);
} }
@ -520,12 +596,13 @@ class ClientCallbackWriterImpl
// 3. Any backlog // 3. Any backlog
started_ = true; started_ = true;
start_tag_.Set(call_.call(), start_tag_.Set(
[this](bool ok) { call_.call(),
reactor_->OnReadInitialMetadataDone(ok); [this](bool ok) {
MaybeFinish(); reactor_->OnReadInitialMetadataDone(ok);
}, MaybeFinish();
&start_ops_); },
&start_ops_);
if (!start_corked_) { if (!start_corked_) {
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_, start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags()); context_->initial_metadata_flags());
@ -536,16 +613,17 @@ class ClientCallbackWriterImpl
// Also set up the read and write tags so that they don't have to be set up // Also set up the read and write tags so that they don't have to be set up
// each time // each time
write_tag_.Set(call_.call(), write_tag_.Set(
[this](bool ok) { call_.call(),
reactor_->OnWriteDone(ok); [this](bool ok) {
MaybeFinish(); reactor_->OnWriteDone(ok);
}, MaybeFinish();
&write_ops_); },
&write_ops_);
write_ops_.set_core_cq_tag(&write_tag_); write_ops_.set_core_cq_tag(&write_tag_);
finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); }, finish_tag_.Set(
&finish_ops_); call_.call(), [this](bool ok) { MaybeFinish(); }, &finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_); finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_); finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_); call_.PerformOps(&finish_ops_);
@ -586,12 +664,13 @@ class ClientCallbackWriterImpl
start_corked_ = false; start_corked_ = false;
} }
writes_done_ops_.ClientSendClose(); writes_done_ops_.ClientSendClose();
writes_done_tag_.Set(call_.call(), writes_done_tag_.Set(
[this](bool ok) { call_.call(),
reactor_->OnWritesDoneDone(ok); [this](bool ok) {
MaybeFinish(); reactor_->OnWritesDoneDone(ok);
}, MaybeFinish();
&writes_done_ops_); },
&writes_done_ops_);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_); writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_++; callbacks_outstanding_++;
if (started_) { if (started_) {
@ -605,20 +684,21 @@ class ClientCallbackWriterImpl
friend class ClientCallbackWriterFactory<Request>; friend class ClientCallbackWriterFactory<Request>;
template <class Response> template <class Response>
ClientCallbackWriterImpl(Call call, ClientContext* context, ClientCallbackWriterImpl(
Response* response, Call call, ClientContext* context, Response* response,
::grpc::experimental::ClientWriteReactor* reactor) ::grpc::experimental::ClientWriteReactor<Request>* reactor)
: context_(context), : context_(context),
call_(call), call_(call),
reactor_(reactor), reactor_(reactor),
start_corked_(context_->initial_metadata_corked_) { start_corked_(context_->initial_metadata_corked_) {
this->BindReactor(reactor);
finish_ops_.RecvMessage(response); finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage(); finish_ops_.AllowNoMessage();
} }
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
::grpc::experimental::ClientWriteReactor* reactor_; ::grpc::experimental::ClientWriteReactor<Request>* reactor_;
CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_;
CallbackWithSuccessTag start_tag_; CallbackWithSuccessTag start_tag_;
@ -646,14 +726,14 @@ template <class Request>
class ClientCallbackWriterFactory { class ClientCallbackWriterFactory {
public: public:
template <class Response> template <class Response>
static experimental::ClientCallbackWriter<Request>* Create( static void Create(
ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, ChannelInterface* channel, const ::grpc::internal::RpcMethod& method,
ClientContext* context, Response* response, ClientContext* context, Response* response,
::grpc::experimental::ClientWriteReactor* reactor) { ::grpc::experimental::ClientWriteReactor<Request>* reactor) {
Call call = channel->CreateCall(method, context, channel->CallbackCQ()); Call call = channel->CreateCall(method, context, channel->CallbackCQ());
g_core_codegen_interface->grpc_call_ref(call.call()); g_core_codegen_interface->grpc_call_ref(call.call());
return new (g_core_codegen_interface->grpc_call_arena_alloc( new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackWriterImpl<Request>))) call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
ClientCallbackWriterImpl<Request>(call, context, response, reactor); ClientCallbackWriterImpl<Request>(call, context, response, reactor);
} }

@ -582,22 +582,21 @@ void PrintHeaderClientMethodCallbackInterfaces(
"std::function<void(::grpc::Status)>) = 0;\n"); "std::function<void(::grpc::Status)>) = 0;\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"virtual ::grpc::experimental::ClientCallbackWriter< " "virtual void $Method$(::grpc::ClientContext* context, "
"$Request$>* $Method$(::grpc::ClientContext* context, "
"$Response$* response, " "$Response$* response, "
"::grpc::experimental::ClientWriteReactor* reactor) = 0;\n"); "::grpc::experimental::ClientWriteReactor< $Request$>* "
"reactor) = 0;\n");
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"virtual ::grpc::experimental::ClientCallbackReader< " "virtual void $Method$(::grpc::ClientContext* context, "
"$Response$>* $Method$(::grpc::ClientContext* context, "
"$Request$* request, " "$Request$* request, "
"::grpc::experimental::ClientReadReactor* reactor) = 0;\n"); "::grpc::experimental::ClientReadReactor< $Response$>* "
"reactor) = 0;\n");
} else if (method->BidiStreaming()) { } else if (method->BidiStreaming()) {
printer->Print( printer->Print(*vars,
*vars, "virtual void $Method$(::grpc::ClientContext* context, "
"virtual ::grpc::experimental::ClientCallbackReaderWriter< $Request$, " "::grpc::experimental::ClientBidiReactor< "
"$Response$>* $Method$(::grpc::ClientContext* context, " "$Request$,$Response$>* reactor) = 0;\n");
"::grpc::experimental::ClientBidiReactor* reactor) = 0;\n");
} }
} }
@ -644,26 +643,23 @@ void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer,
"const $Request$* request, $Response$* response, " "const $Request$* request, $Response$* response, "
"std::function<void(::grpc::Status)>) override;\n"); "std::function<void(::grpc::Status)>) override;\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(*vars,
*vars, "void $Method$(::grpc::ClientContext* context, "
"::grpc::experimental::ClientCallbackWriter< $Request$>* " "$Response$* response, "
"$Method$(::grpc::ClientContext* context, " "::grpc::experimental::ClientWriteReactor< $Request$>* "
"$Response$* response, " "reactor) override;\n");
"::grpc::experimental::ClientWriteReactor* reactor) override;\n");
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print( printer->Print(*vars,
*vars, "void $Method$(::grpc::ClientContext* context, "
"::grpc::experimental::ClientCallbackReader< $Response$>* " "$Request$* request, "
"$Method$(::grpc::ClientContext* context, " "::grpc::experimental::ClientReadReactor< $Response$>* "
"$Request$* request, " "reactor) override;\n");
"::grpc::experimental::ClientReadReactor* reactor) override;\n");
} else if (method->BidiStreaming()) { } else if (method->BidiStreaming()) {
printer->Print( printer->Print(*vars,
*vars, "void $Method$(::grpc::ClientContext* context, "
"::grpc::experimental::ClientCallbackReaderWriter< $Request$, " "::grpc::experimental::ClientBidiReactor< "
"$Response$>* $Method$(::grpc::ClientContext* context, " "$Request$,$Response$>* reactor) override;\n");
"::grpc::experimental::ClientBidiReactor* reactor) override;\n");
} }
} }
@ -1637,13 +1633,12 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
printer->Print( printer->Print(
*vars, *vars,
"::grpc::experimental::ClientCallbackWriter< $Request$>* " "void $ns$$Service$::"
"$ns$$Service$::"
"Stub::experimental_async::$Method$(::grpc::ClientContext* context, " "Stub::experimental_async::$Method$(::grpc::ClientContext* context, "
"$Response$* response, " "$Response$* response, "
"::grpc::experimental::ClientWriteReactor* reactor) {\n"); "::grpc::experimental::ClientWriteReactor< $Request$>* reactor) {\n");
printer->Print(*vars, printer->Print(*vars,
" return ::grpc::internal::ClientCallbackWriterFactory< " " ::grpc::internal::ClientCallbackWriterFactory< "
"$Request$>::Create(" "$Request$>::Create("
"stub_->channel_.get(), " "stub_->channel_.get(), "
"stub_->rpcmethod_$Method$_, " "stub_->rpcmethod_$Method$_, "
@ -1682,14 +1677,14 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"context, request);\n" "context, request);\n"
"}\n\n"); "}\n\n");
printer->Print(
*vars,
"void $ns$$Service$::Stub::experimental_async::$Method$(::grpc::"
"ClientContext* context, "
"$Request$* request, "
"::grpc::experimental::ClientReadReactor< $Response$>* reactor) {\n");
printer->Print(*vars, printer->Print(*vars,
"::grpc::experimental::ClientCallbackReader< $Response$>* " " ::grpc::internal::ClientCallbackReaderFactory< "
"$ns$$Service$::Stub::experimental_async::$Method$(::grpc::"
"ClientContext* context, "
"$Request$* request, "
"::grpc::experimental::ClientReadReactor* reactor) {\n");
printer->Print(*vars,
" return ::grpc::internal::ClientCallbackReaderFactory< "
"$Response$>::Create(" "$Response$>::Create("
"stub_->channel_.get(), " "stub_->channel_.get(), "
"stub_->rpcmethod_$Method$_, " "stub_->rpcmethod_$Method$_, "
@ -1728,20 +1723,19 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"context);\n" "context);\n"
"}\n\n"); "}\n\n");
printer->Print(*vars,
"::grpc::experimental::ClientCallbackReaderWriter< "
"$Request$,$Response$>* "
"$ns$$Service$::Stub::experimental_async::$Method$(::grpc::"
"ClientContext* context, "
"::grpc::experimental::ClientBidiReactor* reactor) {\n");
printer->Print( printer->Print(
*vars, *vars,
" return ::grpc::internal::ClientCallbackReaderWriterFactory< " "void $ns$$Service$::Stub::experimental_async::$Method$(::grpc::"
"$Request$,$Response$>::Create(" "ClientContext* context, "
"stub_->channel_.get(), " "::grpc::experimental::ClientBidiReactor< $Request$,$Response$>* "
"stub_->rpcmethod_$Method$_, " "reactor) {\n");
"context, reactor);\n" printer->Print(*vars,
"}\n\n"); " ::grpc::internal::ClientCallbackReaderWriterFactory< "
"$Request$,$Response$>::Create("
"stub_->channel_.get(), "
"stub_->rpcmethod_$Method$_, "
"context, reactor);\n"
"}\n\n");
for (auto async_prefix : async_prefixes) { for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncPrefix"] = async_prefix.prefix;

@ -72,16 +72,13 @@ void GenericStub::experimental_type::UnaryCall(
context, request, response, std::move(on_completion)); context, request, response, std::move(on_completion));
} }
experimental::ClientCallbackReaderWriter<ByteBuffer, ByteBuffer>* void GenericStub::experimental_type::PrepareBidiStreamingCall(
GenericStub::experimental_type::PrepareBidiStreamingCall(
ClientContext* context, const grpc::string& method, ClientContext* context, const grpc::string& method,
experimental::ClientBidiReactor* reactor) { experimental::ClientBidiReactor<ByteBuffer, ByteBuffer>* reactor) {
return internal::ClientCallbackReaderWriterFactory< internal::ClientCallbackReaderWriterFactory<ByteBuffer, ByteBuffer>::Create(
ByteBuffer, ByteBuffer>::Create(stub_->channel_.get(), stub_->channel_.get(),
internal::RpcMethod( internal::RpcMethod(method.c_str(), internal::RpcMethod::BIDI_STREAMING),
method.c_str(), context, reactor);
internal::RpcMethod::BIDI_STREAMING),
context, reactor);
} }
} // namespace grpc } // namespace grpc

File diff suppressed because it is too large Load Diff

@ -187,20 +187,20 @@ class ClientCallbackEnd2endTest
grpc::string test_string(""); grpc::string test_string("");
for (int i = 0; i < num_rpcs; i++) { for (int i = 0; i < num_rpcs; i++) {
test_string += "Hello world. "; test_string += "Hello world. ";
class Client : public grpc::experimental::ClientBidiReactor { class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer,
ByteBuffer> {
public: public:
Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name, Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name,
const grpc::string& test_str) { const grpc::string& test_str) {
stream_ = test->generic_stub_->experimental().PrepareBidiStreamingCall(
test->generic_stub_->experimental().PrepareBidiStreamingCall( &cli_ctx_, method_name, this);
&cli_ctx_, method_name, this);
request_.set_message(test_str); request_.set_message(test_str);
send_buf_ = SerializeToByteBuffer(&request_); send_buf_ = SerializeToByteBuffer(&request_);
stream_->Write(send_buf_.get()); StartWrite(send_buf_.get());
stream_->Read(&recv_buf_); StartRead(&recv_buf_);
stream_->StartCall(); StartCall();
} }
void OnWriteDone(bool ok) override { stream_->WritesDone(); } void OnWriteDone(bool ok) override { StartWritesDone(); }
void OnReadDone(bool ok) override { void OnReadDone(bool ok) override {
EchoResponse response; EchoResponse response;
EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response)); EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
@ -223,8 +223,6 @@ class ClientCallbackEnd2endTest
std::unique_ptr<ByteBuffer> send_buf_; std::unique_ptr<ByteBuffer> send_buf_;
ByteBuffer recv_buf_; ByteBuffer recv_buf_;
ClientContext cli_ctx_; ClientContext cli_ctx_;
experimental::ClientCallbackReaderWriter<ByteBuffer, ByteBuffer>*
stream_;
std::mutex mu_; std::mutex mu_;
std::condition_variable cv_; std::condition_variable cv_;
bool done_ = false; bool done_ = false;
@ -330,22 +328,21 @@ TEST_P(ClientCallbackEnd2endTest, RequestStream) {
} }
ResetStub(); ResetStub();
class Client : public grpc::experimental::ClientWriteReactor { class Client : public grpc::experimental::ClientWriteReactor<EchoRequest> {
public: public:
explicit Client(grpc::testing::EchoTestService::Stub* stub) { explicit Client(grpc::testing::EchoTestService::Stub* stub) {
context_.set_initial_metadata_corked(true); context_.set_initial_metadata_corked(true);
stream_ = stub->experimental_async()->RequestStream(&context_, &response_, stub->experimental_async()->RequestStream(&context_, &response_, this);
this); StartCall();
stream_->StartCall();
request_.set_message("Hello server."); request_.set_message("Hello server.");
stream_->Write(&request_); StartWrite(&request_);
} }
void OnWriteDone(bool ok) override { void OnWriteDone(bool ok) override {
writes_left_--; writes_left_--;
if (writes_left_ > 1) { if (writes_left_ > 1) {
stream_->Write(&request_); StartWrite(&request_);
} else if (writes_left_ == 1) { } else if (writes_left_ == 1) {
stream_->WriteLast(&request_, WriteOptions()); StartWriteLast(&request_, WriteOptions());
} }
} }
void OnDone(Status s) override { void OnDone(Status s) override {
@ -363,7 +360,6 @@ TEST_P(ClientCallbackEnd2endTest, RequestStream) {
} }
private: private:
::grpc::experimental::ClientCallbackWriter<EchoRequest>* stream_;
EchoRequest request_; EchoRequest request_;
EchoResponse response_; EchoResponse response_;
ClientContext context_; ClientContext context_;
@ -383,14 +379,13 @@ TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
} }
ResetStub(); ResetStub();
class Client : public grpc::experimental::ClientReadReactor { class Client : public grpc::experimental::ClientReadReactor<EchoResponse> {
public: public:
explicit Client(grpc::testing::EchoTestService::Stub* stub) { explicit Client(grpc::testing::EchoTestService::Stub* stub) {
request_.set_message("Hello client "); request_.set_message("Hello client ");
stream_ = stub->experimental_async()->ResponseStream(&context_, &request_, stub->experimental_async()->ResponseStream(&context_, &request_, this);
this); StartCall();
stream_->StartCall(); StartRead(&response_);
stream_->Read(&response_);
} }
void OnReadDone(bool ok) override { void OnReadDone(bool ok) override {
if (!ok) { if (!ok) {
@ -400,7 +395,7 @@ TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
EXPECT_EQ(response_.message(), EXPECT_EQ(response_.message(),
request_.message() + grpc::to_string(reads_complete_)); request_.message() + grpc::to_string(reads_complete_));
reads_complete_++; reads_complete_++;
stream_->Read(&response_); StartRead(&response_);
} }
} }
void OnDone(Status s) override { void OnDone(Status s) override {
@ -417,7 +412,6 @@ TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
} }
private: private:
::grpc::experimental::ClientCallbackReader<EchoResponse>* stream_;
EchoRequest request_; EchoRequest request_;
EchoResponse response_; EchoResponse response_;
ClientContext context_; ClientContext context_;
@ -436,14 +430,15 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
return; return;
} }
ResetStub(); ResetStub();
class Client : public grpc::experimental::ClientBidiReactor { class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
EchoResponse> {
public: public:
explicit Client(grpc::testing::EchoTestService::Stub* stub) { explicit Client(grpc::testing::EchoTestService::Stub* stub) {
request_.set_message("Hello fren "); request_.set_message("Hello fren ");
stream_ = stub->experimental_async()->BidiStream(&context_, this); stub->experimental_async()->BidiStream(&context_, this);
stream_->StartCall(); StartCall();
stream_->Read(&response_); StartRead(&response_);
stream_->Write(&request_); StartWrite(&request_);
} }
void OnReadDone(bool ok) override { void OnReadDone(bool ok) override {
if (!ok) { if (!ok) {
@ -452,15 +447,15 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
EXPECT_EQ(response_.message(), request_.message()); EXPECT_EQ(response_.message(), request_.message());
reads_complete_++; reads_complete_++;
stream_->Read(&response_); StartRead(&response_);
} }
} }
void OnWriteDone(bool ok) override { void OnWriteDone(bool ok) override {
EXPECT_TRUE(ok); EXPECT_TRUE(ok);
if (++writes_complete_ == kServerDefaultResponseStreamsToSend) { if (++writes_complete_ == kServerDefaultResponseStreamsToSend) {
stream_->WritesDone(); StartWritesDone();
} else { } else {
stream_->Write(&request_); StartWrite(&request_);
} }
} }
void OnDone(Status s) override { void OnDone(Status s) override {
@ -477,8 +472,6 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
} }
private: private:
::grpc::experimental::ClientCallbackReaderWriter<EchoRequest, EchoResponse>*
stream_;
EchoRequest request_; EchoRequest request_;
EchoResponse response_; EchoResponse response_;
ClientContext context_; ClientContext context_;

Loading…
Cancel
Save