Merge pull request #18239 from vjpai/callback_async_generic_service

C++: Support callback-based generic service
pull/18289/head
Vijay Pai 6 years ago committed by GitHub
commit 96f8b7a532
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      include/grpcpp/impl/codegen/async_generic_service.h
  2. 10
      include/grpcpp/impl/codegen/server_context.h
  3. 23
      include/grpcpp/impl/codegen/server_interface.h
  4. 42
      include/grpcpp/server.h
  5. 10
      include/grpcpp/server_builder.h
  6. 24
      src/cpp/server/server_builder.cc
  7. 166
      src/cpp/server/server_cc.cc
  8. 122
      test/cpp/end2end/hybrid_end2end_test.cc
  9. 26
      test/cpp/end2end/test_service_impl.cc

@ -21,6 +21,7 @@
#include <grpcpp/impl/codegen/async_stream.h>
#include <grpcpp/impl/codegen/byte_buffer.h>
#include <grpcpp/impl/codegen/server_callback.h>
struct grpc_server;
@ -41,6 +42,12 @@ class GenericServerContext final : public ServerContext {
friend class Server;
friend class ServerInterface;
void Clear() {
method_.clear();
host_.clear();
ServerContext::Clear();
}
grpc::string method_;
grpc::string host_;
};
@ -76,6 +83,50 @@ class AsyncGenericService final {
Server* server_;
};
namespace experimental {
class ServerGenericBidiReactor
: public ServerBidiReactor<ByteBuffer, ByteBuffer> {
public:
void OnStarted(ServerContext* ctx) final {
OnStarted(static_cast<GenericServerContext*>(ctx));
}
virtual void OnStarted(GenericServerContext* ctx) {}
};
} // namespace experimental
namespace internal {
class UnimplementedGenericBidiReactor
: public experimental::ServerGenericBidiReactor {
public:
void OnDone() override { delete this; }
void OnStarted(GenericServerContext*) override {
this->Finish(Status(StatusCode::UNIMPLEMENTED, ""));
}
};
} // namespace internal
namespace experimental {
class CallbackGenericService {
public:
CallbackGenericService() {}
virtual ~CallbackGenericService() {}
virtual ServerGenericBidiReactor* CreateReactor() {
return new internal::UnimplementedGenericBidiReactor;
}
private:
friend class ::grpc::Server;
internal::CallbackBidiHandler<ByteBuffer, ByteBuffer>* Handler() {
return new internal::CallbackBidiHandler<ByteBuffer, ByteBuffer>(
[this] { return CreateReactor(); });
}
Server* server_{nullptr};
};
} // namespace experimental
} // namespace grpc
#endif // GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H

@ -43,6 +43,10 @@ struct census_context;
namespace grpc {
class ClientContext;
class GenericServerContext;
class CompletionQueue;
class Server;
class ServerInterface;
template <class W, class R>
class ServerAsyncReader;
template <class W>
@ -55,6 +59,7 @@ template <class R>
class ServerReader;
template <class W>
class ServerWriter;
namespace internal {
template <class W, class R>
class ServerReaderWriterBody;
@ -82,10 +87,6 @@ class Call;
class ServerReactor;
} // namespace internal
class CompletionQueue;
class Server;
class ServerInterface;
namespace testing {
class InteropServerContextInspector;
class ServerContextTestSpouse;
@ -302,6 +303,7 @@ class ServerContext {
template <StatusCode code>
friend class internal::ErrorMethodHandler;
friend class ::grpc::ClientContext;
friend class ::grpc::GenericServerContext;
/// Prevent copying.
ServerContext(const ServerContext&);

@ -47,6 +47,10 @@ namespace internal {
class ServerAsyncStreamingInterface;
} // namespace internal
namespace experimental {
class CallbackGenericService;
} // namespace experimental
class ServerInterface : public internal::CallHook {
public:
virtual ~ServerInterface() {}
@ -115,6 +119,25 @@ class ServerInterface : public internal::CallHook {
/// service. The service must exist for the lifetime of the Server instance.
virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0;
/// NOTE: class experimental_registration_interface is not part of the public
/// API of this class
/// TODO(vjpai): Move these contents to public API when no longer experimental
class experimental_registration_interface {
public:
virtual ~experimental_registration_interface() {}
/// May not be abstract since this is a post-1.0 API addition
virtual void RegisterCallbackGenericService(
experimental::CallbackGenericService* service) {}
};
/// NOTE: The function experimental_registration() is not stable public API.
/// It is a view to the experimental components of this class. It may be
/// changed or removed at any time. May not be abstract since this is a
/// post-1.0 API addition
virtual experimental_registration_interface* experimental_registration() {
return nullptr;
}
/// Tries to bind \a server to the given \a addr.
///
/// It can be invoked multiple times.

@ -202,6 +202,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
friend class ServerInitializer;
class SyncRequest;
class CallbackRequestBase;
template <class ServerContextType>
class CallbackRequest;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
@ -216,6 +218,34 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
/// service. The service must exist for the lifetime of the Server instance.
void RegisterAsyncGenericService(AsyncGenericService* service) override;
/// NOTE: class experimental_registration_type is not part of the public API
/// of this class
/// TODO(vjpai): Move these contents to the public API of Server when
/// they are no longer experimental
class experimental_registration_type final
: public experimental_registration_interface {
public:
explicit experimental_registration_type(Server* server) : server_(server) {}
void RegisterCallbackGenericService(
experimental::CallbackGenericService* service) override {
server_->RegisterCallbackGenericService(service);
}
private:
Server* server_;
};
/// TODO(vjpai): Mark this override when experimental type above is deleted
void RegisterCallbackGenericService(
experimental::CallbackGenericService* service);
/// NOTE: The function experimental_registration() is not stable public API.
/// It is a view to the experimental components of this class. It may be
/// changed or removed at any time.
experimental_registration_interface* experimental_registration() override {
return &experimental_registration_;
}
void PerformOpsOnCall(internal::CallOpSetInterface* ops,
internal::Call* call) override;
@ -257,7 +287,11 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
std::vector<gpr_atm> callback_unmatched_reqs_count_;
// List of callback requests to start when server actually starts.
std::list<CallbackRequest*> callback_reqs_to_start_;
std::list<CallbackRequestBase*> callback_reqs_to_start_;
// For registering experimental callback generic service; remove when that
// method longer experimental
experimental_registration_type experimental_registration_{this};
// Server status
std::mutex mu_;
@ -281,7 +315,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
std::shared_ptr<GlobalCallbacks> global_callbacks_;
std::vector<grpc::string> services_;
bool has_generic_service_;
bool has_async_generic_service_{false};
bool has_callback_generic_service_{false};
// Pointer to the wrapped grpc_server.
grpc_server* server_;
@ -294,6 +329,9 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
// A special handler for resource exhausted in sync case
std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_;
// Handler for callback generic service, if any
std::unique_ptr<internal::MethodHandler> generic_handler_;
// callback_cq_ references the callbackable completion queue associated
// with this server (if any). It is set on the first call to CallbackCQ().
// It is _not owned_ by the server; ownership belongs with its internal

@ -49,6 +49,10 @@ namespace testing {
class ServerBuilderPluginTest;
} // namespace testing
namespace experimental {
class CallbackGenericService;
} // namespace experimental
/// A builder class for the creation and startup of \a grpc::Server instances.
class ServerBuilder {
public:
@ -227,6 +231,9 @@ class ServerBuilder {
builder_->interceptor_creators_ = std::move(interceptor_creators);
}
ServerBuilder& RegisterCallbackGenericService(
experimental::CallbackGenericService* service);
private:
ServerBuilder* builder_;
};
@ -311,7 +318,8 @@ class ServerBuilder {
std::shared_ptr<ServerCredentials> creds_;
std::vector<std::unique_ptr<ServerBuilderPlugin>> plugins_;
grpc_resource_quota* resource_quota_;
AsyncGenericService* generic_service_;
AsyncGenericService* generic_service_{nullptr};
experimental::CallbackGenericService* callback_generic_service_{nullptr};
struct {
bool is_set;
grpc_compression_level level;

@ -44,8 +44,7 @@ ServerBuilder::ServerBuilder()
: max_receive_message_size_(INT_MIN),
max_send_message_size_(INT_MIN),
sync_server_settings_(SyncServerSettings()),
resource_quota_(nullptr),
generic_service_(nullptr) {
resource_quota_(nullptr) {
gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
for (auto it = g_plugin_factory_list->begin();
it != g_plugin_factory_list->end(); it++) {
@ -91,9 +90,9 @@ ServerBuilder& ServerBuilder::RegisterService(const grpc::string& addr,
ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
AsyncGenericService* service) {
if (generic_service_) {
if (generic_service_ || callback_generic_service_) {
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Adding multiple generic services is unsupported for now. "
"Dropping the service %p",
(void*)service);
} else {
@ -102,6 +101,19 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
return *this;
}
ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService(
experimental::CallbackGenericService* service) {
if (builder_->generic_service_ || builder_->callback_generic_service_) {
gpr_log(GPR_ERROR,
"Adding multiple generic services is unsupported for now. "
"Dropping the service %p",
(void*)service);
} else {
builder_->callback_generic_service_ = service;
}
return *builder_;
}
ServerBuilder& ServerBuilder::SetOption(
std::unique_ptr<ServerBuilderOption> option) {
options_.push_back(std::move(option));
@ -310,7 +322,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
has_frequently_polled_cqs = true;
}
if (has_callback_methods) {
if (has_callback_methods || callback_generic_service_ != nullptr) {
auto* cq = server->CallbackCQ();
grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
}
@ -344,6 +356,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
} else if (callback_generic_service_) {
server->RegisterCallbackGenericService(callback_generic_service_);
} else {
for (auto it = services_.begin(); it != services_.end(); ++it) {
if ((*it)->service->has_generic_methods()) {

@ -19,6 +19,7 @@
#include <cstdlib>
#include <sstream>
#include <type_traits>
#include <utility>
#include <grpc/grpc.h>
@ -348,8 +349,24 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
grpc_completion_queue* cq_;
};
class Server::CallbackRequest final : public internal::CompletionQueueTag {
class Server::CallbackRequestBase : public internal::CompletionQueueTag {
public:
virtual ~CallbackRequestBase() {}
virtual bool Request() = 0;
};
template <class ServerContextType>
class Server::CallbackRequest final : public Server::CallbackRequestBase {
public:
static_assert(std::is_base_of<ServerContext, ServerContextType>::value,
"ServerContextType must be derived from ServerContext");
// The constructor needs to know the server for this callback request and its
// index in the server's request count array to allow for proper dynamic
// requesting of incoming RPCs. For codegen services, the values of method and
// method_tag represent the defined characteristics of the method being
// requested. For generic services, method and method_tag are nullptr since
// these services don't have pre-defined methods or method registration tags.
CallbackRequest(Server* server, size_t method_idx,
internal::RpcServiceMethod* method, void* method_tag)
: server_(server),
@ -357,8 +374,9 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
method_(method),
method_tag_(method_tag),
has_request_payload_(
method->method_type() == internal::RpcMethod::NORMAL_RPC ||
method->method_type() == internal::RpcMethod::SERVER_STREAMING),
method_ != nullptr &&
(method->method_type() == internal::RpcMethod::NORMAL_RPC ||
method->method_type() == internal::RpcMethod::SERVER_STREAMING)),
cq_(server->CallbackCQ()),
tag_(this) {
server_->callback_reqs_outstanding_++;
@ -376,7 +394,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
}
}
bool Request() {
bool Request() override {
if (method_tag_) {
if (GRPC_CALL_OK !=
grpc_server_request_registered_call(
@ -400,12 +418,18 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
return true;
}
bool FinalizeResult(void** tag, bool* status) override { return false; }
// Needs specialization to account for different processing of metadata
// in generic API
bool FinalizeResult(void** tag, bool* status) override;
private:
// method_name needs to be specialized between named method and generic
const char* method_name() const;
class CallbackCallTag : public grpc_experimental_completion_queue_functor {
public:
CallbackCallTag(Server::CallbackRequest* req) : req_(req) {
CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
: req_(req) {
functor_run = &CallbackCallTag::StaticRun;
}
@ -415,7 +439,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
void force_run(bool ok) { Run(ok); }
private:
Server::CallbackRequest* req_;
Server::CallbackRequest<ServerContextType>* req_;
internal::Call* call_;
static void StaticRun(grpc_experimental_completion_queue_functor* cb,
@ -446,8 +470,9 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
if (count == 0 || (count < SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD &&
req_->server_->callback_reqs_outstanding_ <
SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) {
auto* new_req = new CallbackRequest(req_->server_, req_->method_index_,
req_->method_, req_->method_tag_);
auto* new_req = new CallbackRequest<ServerContextType>(
req_->server_, req_->method_index_, req_->method_,
req_->method_tag_);
if (!new_req->Request()) {
// The server must have just decided to shutdown.
gpr_atm_no_barrier_fetch_add(
@ -467,12 +492,14 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
// Create a C++ Call to control the underlying core call
call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(internal::Call)))
internal::Call(
req_->call_, req_->server_, req_->cq_,
req_->server_->max_receive_message_size(),
req_->ctx_.set_server_rpc_info(
req_->method_->name(), req_->method_->method_type(),
req_->server_->interceptor_creators_));
internal::Call(req_->call_, req_->server_, req_->cq_,
req_->server_->max_receive_message_size(),
req_->ctx_.set_server_rpc_info(
req_->method_name(),
(req_->method_ != nullptr)
? req_->method_->method_type()
: internal::RpcMethod::BIDI_STREAMING,
req_->server_->interceptor_creators_));
req_->interceptor_methods_.SetCall(call_);
req_->interceptor_methods_.SetReverse();
@ -501,31 +528,32 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
}
}
void ContinueRunAfterInterception() {
req_->method_->handler()->RunHandler(
internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
[this] {
// Recycle this request if there aren't too many outstanding.
// Note that we don't have to worry about a case where there
// are no requests waiting to match for this method since that
// is already taken care of when binding a request to a call.
// TODO(vjpai): Also don't recycle this request if the dynamic
// load no longer justifies it. Consider measuring
// dynamic load and setting a target accordingly.
if (req_->server_->callback_reqs_outstanding_ <
SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING) {
req_->Clear();
req_->Setup();
} else {
// We can free up this request because there are too many
delete req_;
return;
}
if (!req_->Request()) {
// The server must have just decided to shutdown.
delete req_;
}
}));
auto* handler = (req_->method_ != nullptr)
? req_->method_->handler()
: req_->server_->generic_handler_.get();
handler->RunHandler(internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_, [this] {
// Recycle this request if there aren't too many outstanding.
// Note that we don't have to worry about a case where there
// are no requests waiting to match for this method since that
// is already taken care of when binding a request to a call.
// TODO(vjpai): Also don't recycle this request if the dynamic
// load no longer justifies it. Consider measuring
// dynamic load and setting a target accordingly.
if (req_->server_->callback_reqs_outstanding_ <
SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING) {
req_->Clear();
req_->Setup();
} else {
// We can free up this request because there are too many
delete req_;
return;
}
if (!req_->Request()) {
// The server must have just decided to shutdown.
delete req_;
}
}));
}
};
@ -553,7 +581,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
}
Server* const server_;
size_t method_index_;
const size_t method_index_;
internal::RpcServiceMethod* const method_;
void* const method_tag_;
const bool has_request_payload_;
@ -566,10 +594,39 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
grpc_metadata_array request_metadata_;
CompletionQueue* cq_;
CallbackCallTag tag_;
ServerContext ctx_;
ServerContextType ctx_;
internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
template <>
bool Server::CallbackRequest<ServerContext>::FinalizeResult(void** tag,
bool* status) {
return false;
}
template <>
bool Server::CallbackRequest<GenericServerContext>::FinalizeResult(
void** tag, bool* status) {
if (*status) {
// TODO(yangg) remove the copy here
ctx_.method_ = StringFromCopiedSlice(call_details_->method);
ctx_.host_ = StringFromCopiedSlice(call_details_->host);
}
grpc_slice_unref(call_details_->method);
grpc_slice_unref(call_details_->host);
return false;
}
template <>
const char* Server::CallbackRequest<ServerContext>::method_name() const {
return method_->name();
}
template <>
const char* Server::CallbackRequest<GenericServerContext>::method_name() const {
return ctx_.method().c_str();
}
// Implementation of ThreadManager. Each instance of SyncRequestThreadManager
// manages a pool of threads that poll for incoming Sync RPCs and call the
// appropriate RPC handlers
@ -708,7 +765,6 @@ Server::Server(
started_(false),
shutdown_(false),
shutdown_notified_(false),
has_generic_service_(false),
server_(nullptr),
server_initializer_(new ServerInitializer(this)),
health_check_service_disabled_(false) {
@ -865,7 +921,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
auto method_index = callback_unmatched_reqs_count_.size() - 1;
// TODO(vjpai): Register these dynamically based on need
for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
callback_reqs_to_start_.push_back(new CallbackRequest(
callback_reqs_to_start_.push_back(new CallbackRequest<ServerContext>(
this, method_index, method, method_registration_tag));
}
// Enqueue it so that it will be Request'ed later after all request
@ -891,7 +947,25 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an async generic service against one server.");
service->server_ = this;
has_generic_service_ = true;
has_async_generic_service_ = true;
}
void Server::RegisterCallbackGenericService(
experimental::CallbackGenericService* service) {
GPR_ASSERT(
service->server_ == nullptr &&
"Can only register a callback generic service against one server.");
service->server_ = this;
has_callback_generic_service_ = true;
generic_handler_.reset(service->Handler());
callback_unmatched_reqs_count_.push_back(0);
auto method_index = callback_unmatched_reqs_count_.size() - 1;
// TODO(vjpai): Register these dynamically based on need
for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
callback_reqs_to_start_.push_back(new CallbackRequest<GenericServerContext>(
this, method_index, nullptr, nullptr));
}
}
int Server::AddListeningPort(const grpc::string& addr,
@ -932,7 +1006,7 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
grpc_server_start(server_);
if (!has_generic_service_) {
if (!has_async_generic_service_ && !has_callback_generic_service_) {
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->AddUnknownSyncMethod();
}

@ -28,6 +28,7 @@
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include "src/core/lib/iomgr/iomgr.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@ -39,7 +40,6 @@
namespace grpc {
namespace testing {
namespace {
void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
@ -225,13 +225,23 @@ class TestServiceImplDupPkg
}
};
class HybridEnd2endTest : public ::testing::Test {
class HybridEnd2endTest : public ::testing::TestWithParam<bool> {
protected:
HybridEnd2endTest() {}
void SetUpServer(::grpc::Service* service1, ::grpc::Service* service2,
AsyncGenericService* generic_service,
int max_message_size = 0) {
void SetUp() override {
inproc_ = (::testing::UnitTest::GetInstance()
->current_test_info()
->value_param() != nullptr)
? GetParam()
: false;
}
bool SetUpServer(
::grpc::Service* service1, ::grpc::Service* service2,
AsyncGenericService* generic_service,
experimental::CallbackGenericService* callback_generic_service,
int max_message_size = 0) {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
@ -249,6 +259,10 @@ class HybridEnd2endTest : public ::testing::Test {
if (generic_service) {
builder.RegisterAsyncGenericService(generic_service);
}
if (callback_generic_service) {
builder.experimental().RegisterCallbackGenericService(
callback_generic_service);
}
if (max_message_size != 0) {
builder.SetMaxMessageSize(max_message_size);
@ -259,6 +273,11 @@ class HybridEnd2endTest : public ::testing::Test {
cqs_.push_back(builder.AddCompletionQueue(false));
}
server_ = builder.BuildAndStart();
// If there is a generic callback service, this setup is only successful if
// we have an iomgr that can run in the background or are inprocess
return !callback_generic_service || grpc_iomgr_run_in_background() ||
inproc_;
}
void TearDown() override {
@ -276,7 +295,9 @@ class HybridEnd2endTest : public ::testing::Test {
void ResetStub() {
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureChannelCredentials());
inproc_ ? server_->InProcessChannel(ChannelArguments())
: CreateChannel(server_address_.str(),
InsecureChannelCredentials());
stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
@ -411,12 +432,13 @@ class HybridEnd2endTest : public ::testing::Test {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
std::ostringstream server_address_;
bool inproc_;
};
TEST_F(HybridEnd2endTest, AsyncEcho) {
typedef EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> SType;
SType service;
SetUpServer(&service, nullptr, nullptr);
SetUpServer(&service, nullptr, nullptr, nullptr);
ResetStub();
std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
false);
@ -427,7 +449,7 @@ TEST_F(HybridEnd2endTest, AsyncEcho) {
TEST_F(HybridEnd2endTest, RawEcho) {
typedef EchoTestService::WithRawMethod_Echo<TestServiceImpl> SType;
SType service;
SetUpServer(&service, nullptr, nullptr);
SetUpServer(&service, nullptr, nullptr, nullptr);
ResetStub();
std::thread echo_handler_thread(HandleRawEcho<SType>, &service, cqs_[0].get(),
false);
@ -438,7 +460,7 @@ TEST_F(HybridEnd2endTest, RawEcho) {
TEST_F(HybridEnd2endTest, RawRequestStream) {
typedef EchoTestService::WithRawMethod_RequestStream<TestServiceImpl> SType;
SType service;
SetUpServer(&service, nullptr, nullptr);
SetUpServer(&service, nullptr, nullptr, nullptr);
ResetStub();
std::thread request_stream_handler_thread(HandleRawClientStreaming<SType>,
&service, cqs_[0].get());
@ -451,7 +473,7 @@ TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) {
EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
SType;
SType service;
SetUpServer(&service, nullptr, nullptr);
SetUpServer(&service, nullptr, nullptr, nullptr);
ResetStub();
std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
false);
@ -468,7 +490,7 @@ TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) {
SType;
SType service;
AsyncGenericService generic_service;
SetUpServer(&service, nullptr, &generic_service);
SetUpServer(&service, nullptr, &generic_service, nullptr);
ResetStub();
std::thread generic_handler_thread(HandleGenericCall, &generic_service,
cqs_[0].get());
@ -484,7 +506,7 @@ TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
EchoTestService::WithAsyncMethod_Echo<TestServiceImpl>>
SType;
SType service;
SetUpServer(&service, nullptr, nullptr);
SetUpServer(&service, nullptr, nullptr, nullptr);
ResetStub();
std::thread echo_handler_thread(HandleEcho<SType>, &service, cqs_[0].get(),
false);
@ -500,7 +522,7 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>
SType;
SType service;
SetUpServer(&service, nullptr, nullptr);
SetUpServer(&service, nullptr, nullptr, nullptr);
ResetStub();
std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
&service, cqs_[0].get());
@ -518,7 +540,7 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) {
SType;
SType service;
TestServiceImplDupPkg dup_service;
SetUpServer(&service, &dup_service, nullptr);
SetUpServer(&service, &dup_service, nullptr, nullptr);
ResetStub();
std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
&service, cqs_[0].get());
@ -557,7 +579,7 @@ TEST_F(HybridEnd2endTest,
SType;
SType service;
StreamedUnaryDupPkg dup_service;
SetUpServer(&service, &dup_service, nullptr, 8192);
SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
ResetStub();
std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
&service, cqs_[0].get());
@ -595,7 +617,7 @@ TEST_F(HybridEnd2endTest,
SType;
SType service;
FullyStreamedUnaryDupPkg dup_service;
SetUpServer(&service, &dup_service, nullptr, 8192);
SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
ResetStub();
std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
&service, cqs_[0].get());
@ -636,7 +658,7 @@ TEST_F(HybridEnd2endTest,
SType;
SType service;
SplitResponseStreamDupPkg dup_service;
SetUpServer(&service, &dup_service, nullptr, 8192);
SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
ResetStub();
std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
&service, cqs_[0].get());
@ -676,7 +698,7 @@ TEST_F(HybridEnd2endTest,
SType;
SType service;
FullySplitStreamedDupPkg dup_service;
SetUpServer(&service, &dup_service, nullptr, 8192);
SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
ResetStub();
std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
&service, cqs_[0].get());
@ -728,7 +750,7 @@ TEST_F(HybridEnd2endTest,
SType;
SType service;
FullyStreamedDupPkg dup_service;
SetUpServer(&service, &dup_service, nullptr, 8192);
SetUpServer(&service, &dup_service, nullptr, nullptr, 8192);
ResetStub();
std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
&service, cqs_[0].get());
@ -748,7 +770,7 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) {
SType;
SType service;
duplicate::EchoTestService::AsyncService dup_service;
SetUpServer(&service, &dup_service, nullptr);
SetUpServer(&service, &dup_service, nullptr, nullptr);
ResetStub();
std::thread response_stream_handler_thread(HandleServerStreaming<SType>,
&service, cqs_[0].get());
@ -767,7 +789,7 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) {
TEST_F(HybridEnd2endTest, GenericEcho) {
EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
AsyncGenericService generic_service;
SetUpServer(&service, nullptr, &generic_service);
SetUpServer(&service, nullptr, &generic_service, nullptr);
ResetStub();
std::thread generic_handler_thread(HandleGenericCall, &generic_service,
cqs_[0].get());
@ -775,13 +797,56 @@ TEST_F(HybridEnd2endTest, GenericEcho) {
generic_handler_thread.join();
}
TEST_P(HybridEnd2endTest, CallbackGenericEcho) {
EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
class GenericEchoService : public experimental::CallbackGenericService {
private:
experimental::ServerGenericBidiReactor* CreateReactor() override {
class Reactor : public experimental::ServerGenericBidiReactor {
private:
void OnStarted(GenericServerContext* ctx) override {
ctx_ = ctx;
EXPECT_EQ(ctx->method(), "/grpc.testing.EchoTestService/Echo");
StartRead(&request_);
}
void OnDone() override { delete this; }
void OnReadDone(bool ok) override {
if (!ok) {
EXPECT_EQ(reads_complete_, 1);
} else {
EXPECT_EQ(reads_complete_++, 0);
response_ = request_;
StartWrite(&response_);
StartRead(&request_);
}
}
void OnWriteDone(bool ok) override {
Finish(ok ? Status::OK
: Status(StatusCode::UNKNOWN, "Unexpected failure"));
}
GenericServerContext* ctx_;
ByteBuffer request_;
ByteBuffer response_;
std::atomic_int reads_complete_{0};
};
return new Reactor;
}
} generic_service;
if (!SetUpServer(&service, nullptr, nullptr, &generic_service)) {
return;
}
ResetStub();
TestAllMethods();
}
TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
typedef EchoTestService::WithAsyncMethod_RequestStream<
EchoTestService::WithGenericMethod_Echo<TestServiceImpl>>
SType;
SType service;
AsyncGenericService generic_service;
SetUpServer(&service, nullptr, &generic_service);
SetUpServer(&service, nullptr, &generic_service, nullptr);
ResetStub();
std::thread generic_handler_thread(HandleGenericCall, &generic_service,
cqs_[0].get());
@ -800,7 +865,7 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) {
SType service;
AsyncGenericService generic_service;
TestServiceImplDupPkg dup_service;
SetUpServer(&service, &dup_service, &generic_service);
SetUpServer(&service, &dup_service, &generic_service, nullptr);
ResetStub();
std::thread generic_handler_thread(HandleGenericCall, &generic_service,
cqs_[0].get());
@ -820,7 +885,7 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) {
SType service;
AsyncGenericService generic_service;
duplicate::EchoTestService::AsyncService dup_service;
SetUpServer(&service, &dup_service, &generic_service);
SetUpServer(&service, &dup_service, &generic_service, nullptr);
ResetStub();
std::thread generic_handler_thread(HandleGenericCall, &generic_service,
cqs_[0].get());
@ -843,7 +908,7 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
SType;
SType service;
AsyncGenericService generic_service;
SetUpServer(&service, nullptr, &generic_service);
SetUpServer(&service, nullptr, &generic_service, nullptr);
ResetStub();
std::thread generic_handler_thread(HandleGenericCall, &generic_service,
cqs_[0].get());
@ -864,7 +929,7 @@ TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) {
SType;
SType service;
AsyncGenericService generic_service;
SetUpServer(&service, nullptr, &generic_service);
SetUpServer(&service, nullptr, &generic_service, nullptr);
ResetStub();
std::thread generic_handler_thread(HandleGenericCall, &generic_service,
cqs_[0].get());
@ -885,10 +950,13 @@ TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) {
EchoTestService::WithGenericMethod_Echo<
EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>>>
service;
SetUpServer(&service, nullptr, nullptr);
SetUpServer(&service, nullptr, nullptr, nullptr);
EXPECT_EQ(nullptr, server_.get());
}
INSTANTIATE_TEST_CASE_P(HybridEnd2endTest, HybridEnd2endTest,
::testing::Bool());
} // namespace
} // namespace testing
} // namespace grpc

@ -125,6 +125,19 @@ void ServerTryCancelNonblocking(ServerContext* context) {
gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
}
void LoopUntilCancelled(Alarm* alarm, ServerContext* context,
experimental::ServerCallbackRpcController* controller) {
if (!context->IsCancelled()) {
alarm->experimental().Set(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(1000, GPR_TIMESPAN)),
[alarm, context, controller](bool) {
LoopUntilCancelled(alarm, context, controller);
});
} else {
controller->Finish(Status::CANCELLED);
}
}
} // namespace
Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
@ -290,18 +303,7 @@ void CallbackTestServiceImpl::EchoNonDelayed(
gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
// Now wait until it's really canceled
std::function<void(bool)> recurrence = [this, context, controller,
&recurrence](bool) {
if (!context->IsCancelled()) {
alarm_.experimental().Set(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(1000, GPR_TIMESPAN)),
recurrence);
} else {
controller->Finish(Status::CANCELLED);
}
};
recurrence(true);
LoopUntilCancelled(&alarm_, context, controller);
return;
}

Loading…
Cancel
Save