Merge pull request #18773 from vjpai/unary_reactor

C++ callback API: Add client-side unary reactor model
pull/18823/head
Vijay Pai 6 years ago committed by GitHub
commit 031f61e135
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      include/grpcpp/generic/generic_stub_impl.h
  2. 2
      include/grpcpp/impl/codegen/channel_interface.h
  3. 155
      include/grpcpp/impl/codegen/client_callback.h
  4. 2
      include/grpcpp/impl/codegen/client_context.h
  5. 42
      src/compiler/cpp_generator.cc
  6. 1
      src/proto/grpc/testing/echo_messages.proto
  7. 8
      test/cpp/codegen/compiler_test_golden
  8. 59
      test/cpp/end2end/client_callback_end2end_test.cc
  9. 23
      test/cpp/end2end/test_service_impl.cc

@ -82,6 +82,12 @@ class GenericStub final {
const grpc::ByteBuffer* request, grpc::ByteBuffer* response,
std::function<void(grpc::Status)> on_completion);
/// Setup and start a unary call to a named method \a method using
/// \a context and specifying the \a request and \a response buffers.
void UnaryCall(grpc::ClientContext* context, const grpc::string& method,
const grpc::ByteBuffer* request, grpc::ByteBuffer* response,
grpc::experimental::ClientUnaryReactor* reactor);
/// Setup a call to a named method \a method using \a context and tied to
/// \a reactor . Like any other bidi streaming RPC, it will not be activated
/// until StartCall is invoked on its reactor.

@ -58,6 +58,7 @@ template <class R>
class ClientCallbackReaderFactory;
template <class W>
class ClientCallbackWriterFactory;
class ClientCallbackUnaryFactory;
class InterceptedChannel;
} // namespace internal
@ -117,6 +118,7 @@ class ChannelInterface {
friend class ::grpc::internal::ClientCallbackReaderFactory;
template <class W>
friend class ::grpc::internal::ClientCallbackWriterFactory;
friend class ::grpc::internal::ClientCallbackUnaryFactory;
template <class InputMessage, class OutputMessage>
friend class ::grpc::internal::BlockingUnaryCallImpl;
template <class InputMessage, class OutputMessage>

@ -100,6 +100,7 @@ template <class Response>
class ClientReadReactor;
template <class Request>
class ClientWriteReactor;
class ClientUnaryReactor;
// NOTE: The streaming objects are not actually implemented in the public API.
// These interfaces are provided for mocking only. Typical applications
@ -157,6 +158,15 @@ class ClientCallbackWriter {
}
};
class ClientCallbackUnary {
public:
virtual ~ClientCallbackUnary() {}
virtual void StartCall() = 0;
protected:
void BindReactor(ClientUnaryReactor* reactor);
};
// The following classes are the reactor interfaces that are to be implemented
// by the user. They are passed in to the library as an argument to a call on a
// stub (either a codegen-ed call or a generic call). The streaming RPC is
@ -346,6 +356,36 @@ class ClientWriteReactor {
ClientCallbackWriter<Request>* writer_;
};
/// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
/// This is _not_ a common way of invoking a unary RPC. In practice, this
/// option should be used only if the unary RPC wants to receive initial
/// metadata without waiting for the response to complete. Most deployments of
/// RPC systems do not use this option, but it is needed for generality.
/// All public methods behave as in ClientBidiReactor.
/// StartCall is included for consistency with the other reactor flavors: even
/// though there are no StartRead or StartWrite operations to queue before the
/// call (that is part of the unary call itself) and there is no reactor object
/// being created as a result of this call, we keep a consistent 2-phase
/// initiation API among all the reactor flavors.
class ClientUnaryReactor {
public:
virtual ~ClientUnaryReactor() {}
void StartCall() { call_->StartCall(); }
virtual void OnDone(const Status& s) {}
virtual void OnReadInitialMetadataDone(bool ok) {}
private:
friend class ClientCallbackUnary;
void BindCall(ClientCallbackUnary* call) { call_ = call; }
ClientCallbackUnary* call_;
};
// Define function out-of-line from class to avoid forward declaration issue
inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
reactor->BindCall(this);
}
} // namespace experimental
namespace internal {
@ -512,9 +552,9 @@ class ClientCallbackReaderWriterImpl
this->BindReactor(reactor);
}
ClientContext* context_;
ClientContext* const context_;
Call call_;
::grpc::experimental::ClientBidiReactor<Request, Response>* reactor_;
::grpc::experimental::ClientBidiReactor<Request, Response>* const reactor_;
CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_;
CallbackWithSuccessTag start_tag_;
@ -651,9 +691,9 @@ class ClientCallbackReaderImpl
start_ops_.ClientSendClose();
}
ClientContext* context_;
ClientContext* const context_;
Call call_;
::grpc::experimental::ClientReadReactor<Response>* reactor_;
::grpc::experimental::ClientReadReactor<Response>* const reactor_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose,
CallOpRecvInitialMetadata>
@ -824,9 +864,9 @@ class ClientCallbackWriterImpl
finish_ops_.AllowNoMessage();
}
ClientContext* context_;
ClientContext* const context_;
Call call_;
::grpc::experimental::ClientWriteReactor<Request>* reactor_;
::grpc::experimental::ClientWriteReactor<Request>* const reactor_;
CallOpSet<CallOpSendInitialMetadata, CallOpRecvInitialMetadata> start_ops_;
CallbackWithSuccessTag start_tag_;
@ -867,6 +907,109 @@ class ClientCallbackWriterFactory {
}
};
class ClientCallbackUnaryImpl final
: public ::grpc::experimental::ClientCallbackUnary {
public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(ClientCallbackUnaryImpl));
}
// This operator should never be called as the memory should be freed as part
// of the arena destruction. It only exists to provide a matching operator
// delete to the operator new so that some compilers will not complain (see
// https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { assert(0); }
void StartCall() override {
// This call initiates two batches, each with a callback
// 1. Send initial metadata + write + writes done + recv initial metadata
// 2. Read message, recv trailing metadata
started_ = true;
start_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnReadInitialMetadataDone(ok);
MaybeFinish();
},
&start_ops_);
start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
context_->initial_metadata_flags());
start_ops_.RecvInitialMetadata(context_);
start_ops_.set_core_cq_tag(&start_tag_);
call_.PerformOps(&start_ops_);
finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
&finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
}
void MaybeFinish() {
if (--callbacks_outstanding_ == 0) {
Status s = std::move(finish_status_);
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackUnaryImpl();
g_core_codegen_interface->grpc_call_unref(call);
reactor->OnDone(s);
}
}
private:
friend class ClientCallbackUnaryFactory;
template <class Request, class Response>
ClientCallbackUnaryImpl(Call call, ClientContext* context, Request* request,
Response* response,
::grpc::experimental::ClientUnaryReactor* reactor)
: context_(context), call_(call), reactor_(reactor) {
this->BindReactor(reactor);
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
start_ops_.ClientSendClose();
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
}
ClientContext* const context_;
Call call_;
::grpc::experimental::ClientUnaryReactor* const reactor_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose,
CallOpRecvInitialMetadata>
start_ops_;
CallbackWithSuccessTag start_tag_;
CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CallbackWithSuccessTag finish_tag_;
Status finish_status_;
// This call will have 2 callbacks: start and finish
std::atomic_int callbacks_outstanding_{2};
bool started_{false};
};
class ClientCallbackUnaryFactory {
public:
template <class Request, class Response>
static void Create(ChannelInterface* channel,
const ::grpc::internal::RpcMethod& method,
ClientContext* context, const Request* request,
Response* response,
::grpc::experimental::ClientUnaryReactor* reactor) {
Call call = channel->CreateCall(method, context, channel->CallbackCQ());
g_core_codegen_interface->grpc_call_ref(call.call());
new (g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackUnaryImpl)))
ClientCallbackUnaryImpl(call, context, request, response, reactor);
}
};
} // namespace internal
} // namespace grpc

@ -79,6 +79,7 @@ template <class Response>
class ClientCallbackReaderImpl;
template <class Request>
class ClientCallbackWriterImpl;
class ClientCallbackUnaryImpl;
} // namespace internal
template <class R>
@ -417,6 +418,7 @@ class ClientContext {
friend class ::grpc::internal::ClientCallbackReaderImpl;
template <class Request>
friend class ::grpc::internal::ClientCallbackWriterImpl;
friend class ::grpc::internal::ClientCallbackUnaryImpl;
// Used by friend class CallOpClientRecvStatus
void set_debug_error_string(const grpc::string& debug_error_string) {

@ -607,6 +607,14 @@ void PrintHeaderClientMethodCallbackInterfaces(
"virtual void $Method$(::grpc::ClientContext* context, "
"const ::grpc::ByteBuffer* request, $Response$* response, "
"std::function<void(::grpc::Status)>) = 0;\n");
printer->Print(*vars,
"virtual void $Method$(::grpc::ClientContext* context, "
"const $Request$* request, $Response$* response, "
"::grpc::experimental::ClientUnaryReactor* reactor) = 0;\n");
printer->Print(*vars,
"virtual void $Method$(::grpc::ClientContext* context, "
"const ::grpc::ByteBuffer* request, $Response$* response, "
"::grpc::experimental::ClientUnaryReactor* reactor) = 0;\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"virtual void $Method$(::grpc::ClientContext* context, "
@ -673,6 +681,16 @@ void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer,
"void $Method$(::grpc::ClientContext* context, "
"const ::grpc::ByteBuffer* request, $Response$* response, "
"std::function<void(::grpc::Status)>) override;\n");
printer->Print(
*vars,
"void $Method$(::grpc::ClientContext* context, "
"const $Request$* request, $Response$* response, "
"::grpc::experimental::ClientUnaryReactor* reactor) override;\n");
printer->Print(
*vars,
"void $Method$(::grpc::ClientContext* context, "
"const ::grpc::ByteBuffer* request, $Response$* response, "
"::grpc::experimental::ClientUnaryReactor* reactor) override;\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"void $Method$(::grpc::ClientContext* context, "
@ -1671,7 +1689,7 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"const $Request$* request, $Response$* response, "
"std::function<void(::grpc::Status)> f) {\n");
printer->Print(*vars,
" return ::grpc::internal::CallbackUnaryCall"
" ::grpc::internal::CallbackUnaryCall"
"(stub_->channel_.get(), stub_->rpcmethod_$Method$_, "
"context, request, response, std::move(f));\n}\n\n");
@ -1681,10 +1699,30 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"const ::grpc::ByteBuffer* request, $Response$* response, "
"std::function<void(::grpc::Status)> f) {\n");
printer->Print(*vars,
" return ::grpc::internal::CallbackUnaryCall"
" ::grpc::internal::CallbackUnaryCall"
"(stub_->channel_.get(), stub_->rpcmethod_$Method$_, "
"context, request, response, std::move(f));\n}\n\n");
printer->Print(*vars,
"void $ns$$Service$::Stub::experimental_async::$Method$("
"::grpc::ClientContext* context, "
"const $Request$* request, $Response$* response, "
"::grpc::experimental::ClientUnaryReactor* reactor) {\n");
printer->Print(*vars,
" ::grpc::internal::ClientCallbackUnaryFactory::Create"
"(stub_->channel_.get(), stub_->rpcmethod_$Method$_, "
"context, request, response, reactor);\n}\n\n");
printer->Print(*vars,
"void $ns$$Service$::Stub::experimental_async::$Method$("
"::grpc::ClientContext* context, "
"const ::grpc::ByteBuffer* request, $Response$* response, "
"::grpc::experimental::ClientUnaryReactor* reactor) {\n");
printer->Print(*vars,
" ::grpc::internal::ClientCallbackUnaryFactory::Create"
"(stub_->channel_.get(), stub_->rpcmethod_$Method$_, "
"context, request, response, reactor);\n}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;

@ -47,6 +47,7 @@ message RequestParams {
ErrorStatus expected_error = 14;
int32 server_sleep_us = 15; // Amount to sleep when invoking server
int32 backend_channel_idx = 16; // which backend to send request to
bool echo_metadata_initially = 17;
}
message EchoRequest {

@ -114,6 +114,8 @@ class ServiceA final {
// MethodA1 leading comment 1
virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0;
virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0;
virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
// MethodA1 trailing comment 1
// MethodA2 detached leading comment 1
//
@ -184,6 +186,8 @@ class ServiceA final {
public:
void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
void MethodA1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
void MethodA1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
void MethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::experimental::ClientWriteReactor< ::grpc::testing::Request>* reactor) override;
void MethodA3(::grpc::ClientContext* context, ::grpc::testing::Request* request, ::grpc::experimental::ClientReadReactor< ::grpc::testing::Response>* reactor) override;
void MethodA4(::grpc::ClientContext* context, ::grpc::experimental::ClientBidiReactor< ::grpc::testing::Request,::grpc::testing::Response>* reactor) override;
@ -717,6 +721,8 @@ class ServiceB final {
// MethodB1 leading comment 1
virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0;
virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) = 0;
virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0;
// MethodB1 trailing comment 1
};
virtual class experimental_async_interface* experimental_async() { return nullptr; }
@ -739,6 +745,8 @@ class ServiceB final {
public:
void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
void MethodB1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, std::function<void(::grpc::Status)>) override;
void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
void MethodB1(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::grpc::testing::Response* response, ::grpc::experimental::ClientUnaryReactor* reactor) override;
private:
friend class Stub;
explicit experimental_async(Stub* stub): stub_(stub) { }

@ -696,6 +696,65 @@ TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
}
}
TEST_P(ClientCallbackEnd2endTest, UnaryReactor) {
MAYBE_SKIP_TEST;
ResetStub();
class UnaryClient : public grpc::experimental::ClientUnaryReactor {
public:
UnaryClient(grpc::testing::EchoTestService::Stub* stub) {
cli_ctx_.AddMetadata("key1", "val1");
cli_ctx_.AddMetadata("key2", "val2");
request_.mutable_param()->set_echo_metadata_initially(true);
request_.set_message("Hello metadata");
stub->experimental_async()->Echo(&cli_ctx_, &request_, &response_, this);
StartCall();
}
void OnReadInitialMetadataDone(bool ok) override {
EXPECT_TRUE(ok);
EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1"));
EXPECT_EQ(
"val1",
ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second));
EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2"));
EXPECT_EQ(
"val2",
ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second));
initial_metadata_done_ = true;
}
void OnDone(const Status& s) override {
EXPECT_TRUE(initial_metadata_done_);
EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size());
EXPECT_TRUE(s.ok());
EXPECT_EQ(request_.message(), response_.message());
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
}
private:
EchoRequest request_;
EchoResponse response_;
ClientContext cli_ctx_;
std::mutex mu_;
std::condition_variable cv_;
bool done_{false};
bool initial_metadata_done_{false};
};
UnaryClient test{stub_.get()};
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
public:
ReadClient(grpc::testing::EchoTestService::Stub* stub,

@ -200,6 +200,17 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
EXPECT_FALSE(context->IsCancelled());
}
if (request->has_param() && request->param().echo_metadata_initially()) {
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
context->client_metadata();
for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
iter = client_metadata.begin();
iter != client_metadata.end(); ++iter) {
context->AddInitialMetadata(ToString(iter->first),
ToString(iter->second));
}
}
if (request->has_param() && request->param().echo_metadata()) {
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
context->client_metadata();
@ -379,6 +390,18 @@ void CallbackTestServiceImpl::EchoNonDelayed(
EXPECT_FALSE(context->IsCancelled());
}
if (request->has_param() && request->param().echo_metadata_initially()) {
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
context->client_metadata();
for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
iter = client_metadata.begin();
iter != client_metadata.end(); ++iter) {
context->AddInitialMetadata(ToString(iter->first),
ToString(iter->second));
}
controller->SendInitialMetadata([](bool ok) { EXPECT_TRUE(ok); });
}
if (request->has_param() && request->param().echo_metadata()) {
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
context->client_metadata();

Loading…
Cancel
Save