diff --git a/include/grpcpp/impl/codegen/async_generic_service.h b/include/grpcpp/impl/codegen/async_generic_service.h index 2a0e1b40881..46489b135d7 100644 --- a/include/grpcpp/impl/codegen/async_generic_service.h +++ b/include/grpcpp/impl/codegen/async_generic_service.h @@ -21,6 +21,7 @@ #include #include +#include struct grpc_server; @@ -41,6 +42,12 @@ class GenericServerContext final : public ServerContext { friend class Server; friend class ServerInterface; + void Clear() { + method_.clear(); + host_.clear(); + ServerContext::Clear(); + } + grpc::string method_; grpc::string host_; }; @@ -76,6 +83,50 @@ class AsyncGenericService final { Server* server_; }; +namespace experimental { + +class ServerGenericBidiReactor + : public ServerBidiReactor { + public: + void OnStarted(ServerContext* ctx) final { + OnStarted(static_cast(ctx)); + } + virtual void OnStarted(GenericServerContext* ctx) {} +}; + +} // namespace experimental + +namespace internal { +class UnimplementedGenericBidiReactor + : public experimental::ServerGenericBidiReactor { + public: + void OnDone() override { delete this; } + void OnStarted(GenericServerContext*) override { + this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); + } +}; +} // namespace internal + +namespace experimental { +class CallbackGenericService { + public: + CallbackGenericService() {} + virtual ~CallbackGenericService() {} + virtual ServerGenericBidiReactor* CreateReactor() { + return new internal::UnimplementedGenericBidiReactor; + } + + private: + friend class ::grpc::Server; + + internal::CallbackBidiHandler* Handler() { + return new internal::CallbackBidiHandler( + [this] { return CreateReactor(); }); + } + + Server* server_{nullptr}; +}; +} // namespace experimental } // namespace grpc #endif // GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index affe61b547b..fb82186d69e 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -43,6 +43,10 @@ struct census_context; namespace grpc { class ClientContext; +class GenericServerContext; +class CompletionQueue; +class Server; +class ServerInterface; template class ServerAsyncReader; template @@ -55,6 +59,7 @@ template class ServerReader; template class ServerWriter; + namespace internal { template class ServerReaderWriterBody; @@ -82,10 +87,6 @@ class Call; class ServerReactor; } // namespace internal -class CompletionQueue; -class Server; -class ServerInterface; - namespace testing { class InteropServerContextInspector; class ServerContextTestSpouse; @@ -302,6 +303,7 @@ class ServerContext { template friend class internal::ErrorMethodHandler; friend class ::grpc::ClientContext; + friend class ::grpc::GenericServerContext; /// Prevent copying. ServerContext(const ServerContext&); diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index 890a5650d02..f599e037fd5 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -47,6 +47,10 @@ namespace internal { class ServerAsyncStreamingInterface; } // namespace internal +namespace experimental { +class CallbackGenericService; +} // namespace experimental + class ServerInterface : public internal::CallHook { public: virtual ~ServerInterface() {} @@ -115,6 +119,25 @@ class ServerInterface : public internal::CallHook { /// service. The service must exist for the lifetime of the Server instance. virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0; + /// NOTE: class experimental_registration_interface is not part of the public + /// API of this class + /// TODO(vjpai): Move these contents to public API when no longer experimental + class experimental_registration_interface { + public: + virtual ~experimental_registration_interface() {} + /// May not be abstract since this is a post-1.0 API addition + virtual void RegisterCallbackGenericService( + experimental::CallbackGenericService* service) {} + }; + + /// NOTE: The function experimental_registration() is not stable public API. + /// It is a view to the experimental components of this class. It may be + /// changed or removed at any time. May not be abstract since this is a + /// post-1.0 API addition + virtual experimental_registration_interface* experimental_registration() { + return nullptr; + } + /// Tries to bind \a server to the given \a addr. /// /// It can be invoked multiple times. diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index 248f20452a5..21c908aebdb 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -202,6 +202,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { friend class ServerInitializer; class SyncRequest; + class CallbackRequestBase; + template class CallbackRequest; class UnimplementedAsyncRequest; class UnimplementedAsyncResponse; @@ -216,6 +218,34 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { /// service. The service must exist for the lifetime of the Server instance. void RegisterAsyncGenericService(AsyncGenericService* service) override; + /// NOTE: class experimental_registration_type is not part of the public API + /// of this class + /// TODO(vjpai): Move these contents to the public API of Server when + /// they are no longer experimental + class experimental_registration_type final + : public experimental_registration_interface { + public: + explicit experimental_registration_type(Server* server) : server_(server) {} + void RegisterCallbackGenericService( + experimental::CallbackGenericService* service) override { + server_->RegisterCallbackGenericService(service); + } + + private: + Server* server_; + }; + + /// TODO(vjpai): Mark this override when experimental type above is deleted + void RegisterCallbackGenericService( + experimental::CallbackGenericService* service); + + /// NOTE: The function experimental_registration() is not stable public API. + /// It is a view to the experimental components of this class. It may be + /// changed or removed at any time. + experimental_registration_interface* experimental_registration() override { + return &experimental_registration_; + } + void PerformOpsOnCall(internal::CallOpSetInterface* ops, internal::Call* call) override; @@ -257,7 +287,11 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { std::vector callback_unmatched_reqs_count_; // List of callback requests to start when server actually starts. - std::list callback_reqs_to_start_; + std::list callback_reqs_to_start_; + + // For registering experimental callback generic service; remove when that + // method longer experimental + experimental_registration_type experimental_registration_{this}; // Server status std::mutex mu_; @@ -281,7 +315,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { std::shared_ptr global_callbacks_; std::vector services_; - bool has_generic_service_; + bool has_async_generic_service_{false}; + bool has_callback_generic_service_{false}; // Pointer to the wrapped grpc_server. grpc_server* server_; @@ -294,6 +329,9 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { // A special handler for resource exhausted in sync case std::unique_ptr resource_exhausted_handler_; + // Handler for callback generic service, if any + std::unique_ptr generic_handler_; + // callback_cq_ references the callbackable completion queue associated // with this server (if any). It is set on the first call to CallbackCQ(). // It is _not owned_ by the server; ownership belongs with its internal diff --git a/include/grpcpp/server_builder.h b/include/grpcpp/server_builder.h index 028b8cffaa7..498e5b7bb31 100644 --- a/include/grpcpp/server_builder.h +++ b/include/grpcpp/server_builder.h @@ -49,6 +49,10 @@ namespace testing { class ServerBuilderPluginTest; } // namespace testing +namespace experimental { +class CallbackGenericService; +} // namespace experimental + /// A builder class for the creation and startup of \a grpc::Server instances. class ServerBuilder { public: @@ -227,6 +231,9 @@ class ServerBuilder { builder_->interceptor_creators_ = std::move(interceptor_creators); } + ServerBuilder& RegisterCallbackGenericService( + experimental::CallbackGenericService* service); + private: ServerBuilder* builder_; }; @@ -311,7 +318,8 @@ class ServerBuilder { std::shared_ptr creds_; std::vector> plugins_; grpc_resource_quota* resource_quota_; - AsyncGenericService* generic_service_; + AsyncGenericService* generic_service_{nullptr}; + experimental::CallbackGenericService* callback_generic_service_{nullptr}; struct { bool is_set; grpc_compression_level level; diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index b7fad558abb..cd0e516d9a3 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -44,8 +44,7 @@ ServerBuilder::ServerBuilder() : max_receive_message_size_(INT_MIN), max_send_message_size_(INT_MIN), sync_server_settings_(SyncServerSettings()), - resource_quota_(nullptr), - generic_service_(nullptr) { + resource_quota_(nullptr) { gpr_once_init(&once_init_plugin_list, do_plugin_list_init); for (auto it = g_plugin_factory_list->begin(); it != g_plugin_factory_list->end(); it++) { @@ -91,9 +90,9 @@ ServerBuilder& ServerBuilder::RegisterService(const grpc::string& addr, ServerBuilder& ServerBuilder::RegisterAsyncGenericService( AsyncGenericService* service) { - if (generic_service_) { + if (generic_service_ || callback_generic_service_) { gpr_log(GPR_ERROR, - "Adding multiple AsyncGenericService is unsupported for now. " + "Adding multiple generic services is unsupported for now. " "Dropping the service %p", (void*)service); } else { @@ -102,6 +101,19 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService( return *this; } +ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService( + experimental::CallbackGenericService* service) { + if (builder_->generic_service_ || builder_->callback_generic_service_) { + gpr_log(GPR_ERROR, + "Adding multiple generic services is unsupported for now. " + "Dropping the service %p", + (void*)service); + } else { + builder_->callback_generic_service_ = service; + } + return *builder_; +} + ServerBuilder& ServerBuilder::SetOption( std::unique_ptr option) { options_.push_back(std::move(option)); @@ -310,7 +322,7 @@ std::unique_ptr ServerBuilder::BuildAndStart() { has_frequently_polled_cqs = true; } - if (has_callback_methods) { + if (has_callback_methods || callback_generic_service_ != nullptr) { auto* cq = server->CallbackCQ(); grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); } @@ -344,6 +356,8 @@ std::unique_ptr ServerBuilder::BuildAndStart() { if (generic_service_) { server->RegisterAsyncGenericService(generic_service_); + } else if (callback_generic_service_) { + server->RegisterCallbackGenericService(callback_generic_service_); } else { for (auto it = services_.begin(); it != services_.end(); ++it) { if ((*it)->service->has_generic_methods()) { diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 7eb0f2372b6..4d5c8179fce 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -348,8 +349,18 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { grpc_completion_queue* cq_; }; -class Server::CallbackRequest final : public internal::CompletionQueueTag { +class Server::CallbackRequestBase : public internal::CompletionQueueTag { public: + virtual ~CallbackRequestBase() {} + virtual bool Request() = 0; +}; + +template +class Server::CallbackRequest final : public Server::CallbackRequestBase { + public: + static_assert(std::is_base_of::value, + "ServerContextType must be derived from ServerContext"); + CallbackRequest(Server* server, size_t method_idx, internal::RpcServiceMethod* method, void* method_tag) : server_(server), @@ -357,8 +368,9 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { method_(method), method_tag_(method_tag), has_request_payload_( - method->method_type() == internal::RpcMethod::NORMAL_RPC || - method->method_type() == internal::RpcMethod::SERVER_STREAMING), + method_ != nullptr && + (method->method_type() == internal::RpcMethod::NORMAL_RPC || + method->method_type() == internal::RpcMethod::SERVER_STREAMING)), cq_(server->CallbackCQ()), tag_(this) { server_->callback_reqs_outstanding_++; @@ -376,7 +388,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { } } - bool Request() { + bool Request() override { if (method_tag_) { if (GRPC_CALL_OK != grpc_server_request_registered_call( @@ -400,12 +412,18 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { return true; } - bool FinalizeResult(void** tag, bool* status) override { return false; } + // Needs specialization to account for different processing of metadata + // in generic API + bool FinalizeResult(void** tag, bool* status) override; private: + // method_name needs to be specialized between named method and generic + const char* method_name() const; + class CallbackCallTag : public grpc_experimental_completion_queue_functor { public: - CallbackCallTag(Server::CallbackRequest* req) : req_(req) { + CallbackCallTag(Server::CallbackRequest* req) + : req_(req) { functor_run = &CallbackCallTag::StaticRun; } @@ -415,7 +433,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { void force_run(bool ok) { Run(ok); } private: - Server::CallbackRequest* req_; + Server::CallbackRequest* req_; internal::Call* call_; static void StaticRun(grpc_experimental_completion_queue_functor* cb, @@ -446,8 +464,9 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { if (count == 0 || (count < SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD && req_->server_->callback_reqs_outstanding_ < SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) { - auto* new_req = new CallbackRequest(req_->server_, req_->method_index_, - req_->method_, req_->method_tag_); + auto* new_req = new CallbackRequest( + req_->server_, req_->method_index_, req_->method_, + req_->method_tag_); if (!new_req->Request()) { // The server must have just decided to shutdown. gpr_atm_no_barrier_fetch_add( @@ -467,12 +486,14 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { // Create a C++ Call to control the underlying core call call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(internal::Call))) - internal::Call( - req_->call_, req_->server_, req_->cq_, - req_->server_->max_receive_message_size(), - req_->ctx_.set_server_rpc_info( - req_->method_->name(), req_->method_->method_type(), - req_->server_->interceptor_creators_)); + internal::Call(req_->call_, req_->server_, req_->cq_, + req_->server_->max_receive_message_size(), + req_->ctx_.set_server_rpc_info( + req_->method_name(), + (req_->method_ != nullptr) + ? req_->method_->method_type() + : internal::RpcMethod::BIDI_STREAMING, + req_->server_->interceptor_creators_)); req_->interceptor_methods_.SetCall(call_); req_->interceptor_methods_.SetReverse(); @@ -501,31 +522,32 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { } } void ContinueRunAfterInterception() { - req_->method_->handler()->RunHandler( - internal::MethodHandler::HandlerParameter( - call_, &req_->ctx_, req_->request_, req_->request_status_, - [this] { - // Recycle this request if there aren't too many outstanding. - // Note that we don't have to worry about a case where there - // are no requests waiting to match for this method since that - // is already taken care of when binding a request to a call. - // TODO(vjpai): Also don't recycle this request if the dynamic - // load no longer justifies it. Consider measuring - // dynamic load and setting a target accordingly. - if (req_->server_->callback_reqs_outstanding_ < - SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING) { - req_->Clear(); - req_->Setup(); - } else { - // We can free up this request because there are too many - delete req_; - return; - } - if (!req_->Request()) { - // The server must have just decided to shutdown. - delete req_; - } - })); + auto* handler = (req_->method_ != nullptr) + ? req_->method_->handler() + : req_->server_->generic_handler_.get(); + handler->RunHandler(internal::MethodHandler::HandlerParameter( + call_, &req_->ctx_, req_->request_, req_->request_status_, [this] { + // Recycle this request if there aren't too many outstanding. + // Note that we don't have to worry about a case where there + // are no requests waiting to match for this method since that + // is already taken care of when binding a request to a call. + // TODO(vjpai): Also don't recycle this request if the dynamic + // load no longer justifies it. Consider measuring + // dynamic load and setting a target accordingly. + if (req_->server_->callback_reqs_outstanding_ < + SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING) { + req_->Clear(); + req_->Setup(); + } else { + // We can free up this request because there are too many + delete req_; + return; + } + if (!req_->Request()) { + // The server must have just decided to shutdown. + delete req_; + } + })); } }; @@ -553,7 +575,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { } Server* const server_; - size_t method_index_; + const size_t method_index_; internal::RpcServiceMethod* const method_; void* const method_tag_; const bool has_request_payload_; @@ -566,10 +588,39 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { grpc_metadata_array request_metadata_; CompletionQueue* cq_; CallbackCallTag tag_; - ServerContext ctx_; + ServerContextType ctx_; internal::InterceptorBatchMethodsImpl interceptor_methods_; }; +template <> +bool Server::CallbackRequest::FinalizeResult(void** tag, + bool* status) { + return false; +} + +template <> +bool Server::CallbackRequest::FinalizeResult( + void** tag, bool* status) { + if (*status) { + // TODO(yangg) remove the copy here + ctx_.method_ = StringFromCopiedSlice(call_details_->method); + ctx_.host_ = StringFromCopiedSlice(call_details_->host); + } + grpc_slice_unref(call_details_->method); + grpc_slice_unref(call_details_->host); + return false; +} + +template <> +const char* Server::CallbackRequest::method_name() const { + return method_->name(); +} + +template <> +const char* Server::CallbackRequest::method_name() const { + return ctx_.method().c_str(); +} + // Implementation of ThreadManager. Each instance of SyncRequestThreadManager // manages a pool of threads that poll for incoming Sync RPCs and call the // appropriate RPC handlers @@ -708,7 +759,6 @@ Server::Server( started_(false), shutdown_(false), shutdown_notified_(false), - has_generic_service_(false), server_(nullptr), server_initializer_(new ServerInitializer(this)), health_check_service_disabled_(false) { @@ -865,7 +915,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { auto method_index = callback_unmatched_reqs_count_.size() - 1; // TODO(vjpai): Register these dynamically based on need for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { - callback_reqs_to_start_.push_back(new CallbackRequest( + callback_reqs_to_start_.push_back(new CallbackRequest( this, method_index, method, method_registration_tag)); } // Enqueue it so that it will be Request'ed later after all request @@ -891,7 +941,25 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an async generic service against one server."); service->server_ = this; - has_generic_service_ = true; + has_async_generic_service_ = true; +} + +void Server::RegisterCallbackGenericService( + experimental::CallbackGenericService* service) { + GPR_ASSERT( + service->server_ == nullptr && + "Can only register a callback generic service against one server."); + service->server_ = this; + has_callback_generic_service_ = true; + generic_handler_.reset(service->Handler()); + + callback_unmatched_reqs_count_.push_back(0); + auto method_index = callback_unmatched_reqs_count_.size() - 1; + // TODO(vjpai): Register these dynamically based on need + for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { + callback_reqs_to_start_.push_back(new CallbackRequest( + this, method_index, nullptr, nullptr)); + } } int Server::AddListeningPort(const grpc::string& addr, @@ -932,7 +1000,7 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { grpc_server_start(server_); - if (!has_generic_service_) { + if (!has_async_generic_service_ && !has_callback_generic_service_) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->AddUnknownSyncMethod(); } diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 18bb1ff4b96..b0dd901cf11 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -28,6 +28,7 @@ #include #include +#include "src/core/lib/iomgr/iomgr.h" #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" @@ -39,7 +40,6 @@ namespace grpc { namespace testing { - namespace { void* tag(int i) { return (void*)static_cast(i); } @@ -225,13 +225,23 @@ class TestServiceImplDupPkg } }; -class HybridEnd2endTest : public ::testing::Test { +class HybridEnd2endTest : public ::testing::TestWithParam { protected: HybridEnd2endTest() {} - void SetUpServer(::grpc::Service* service1, ::grpc::Service* service2, - AsyncGenericService* generic_service, - int max_message_size = 0) { + void SetUp() override { + inproc_ = (::testing::UnitTest::GetInstance() + ->current_test_info() + ->value_param() != nullptr) + ? GetParam() + : false; + } + + bool SetUpServer( + ::grpc::Service* service1, ::grpc::Service* service2, + AsyncGenericService* generic_service, + experimental::CallbackGenericService* callback_generic_service, + int max_message_size = 0) { int port = grpc_pick_unused_port_or_die(); server_address_ << "localhost:" << port; @@ -249,6 +259,10 @@ class HybridEnd2endTest : public ::testing::Test { if (generic_service) { builder.RegisterAsyncGenericService(generic_service); } + if (callback_generic_service) { + builder.experimental().RegisterCallbackGenericService( + callback_generic_service); + } if (max_message_size != 0) { builder.SetMaxMessageSize(max_message_size); @@ -259,6 +273,11 @@ class HybridEnd2endTest : public ::testing::Test { cqs_.push_back(builder.AddCompletionQueue(false)); } server_ = builder.BuildAndStart(); + + // If there is a generic callback service, this setup is only successful if + // we have an iomgr that can run in the background or are inprocess + return !callback_generic_service || grpc_iomgr_run_in_background() || + inproc_; } void TearDown() override { @@ -276,7 +295,9 @@ class HybridEnd2endTest : public ::testing::Test { void ResetStub() { std::shared_ptr channel = - CreateChannel(server_address_.str(), InsecureChannelCredentials()); + inproc_ ? server_->InProcessChannel(ChannelArguments()) + : CreateChannel(server_address_.str(), + InsecureChannelCredentials()); stub_ = grpc::testing::EchoTestService::NewStub(channel); } @@ -411,12 +432,13 @@ class HybridEnd2endTest : public ::testing::Test { std::unique_ptr stub_; std::unique_ptr server_; std::ostringstream server_address_; + bool inproc_; }; TEST_F(HybridEnd2endTest, AsyncEcho) { typedef EchoTestService::WithAsyncMethod_Echo SType; SType service; - SetUpServer(&service, nullptr, nullptr); + SetUpServer(&service, nullptr, nullptr, nullptr); ResetStub(); std::thread echo_handler_thread(HandleEcho, &service, cqs_[0].get(), false); @@ -427,7 +449,7 @@ TEST_F(HybridEnd2endTest, AsyncEcho) { TEST_F(HybridEnd2endTest, RawEcho) { typedef EchoTestService::WithRawMethod_Echo SType; SType service; - SetUpServer(&service, nullptr, nullptr); + SetUpServer(&service, nullptr, nullptr, nullptr); ResetStub(); std::thread echo_handler_thread(HandleRawEcho, &service, cqs_[0].get(), false); @@ -438,7 +460,7 @@ TEST_F(HybridEnd2endTest, RawEcho) { TEST_F(HybridEnd2endTest, RawRequestStream) { typedef EchoTestService::WithRawMethod_RequestStream SType; SType service; - SetUpServer(&service, nullptr, nullptr); + SetUpServer(&service, nullptr, nullptr, nullptr); ResetStub(); std::thread request_stream_handler_thread(HandleRawClientStreaming, &service, cqs_[0].get()); @@ -451,7 +473,7 @@ TEST_F(HybridEnd2endTest, AsyncEchoRawRequestStream) { EchoTestService::WithAsyncMethod_Echo> SType; SType service; - SetUpServer(&service, nullptr, nullptr); + SetUpServer(&service, nullptr, nullptr, nullptr); ResetStub(); std::thread echo_handler_thread(HandleEcho, &service, cqs_[0].get(), false); @@ -468,7 +490,7 @@ TEST_F(HybridEnd2endTest, GenericEchoRawRequestStream) { SType; SType service; AsyncGenericService generic_service; - SetUpServer(&service, nullptr, &generic_service); + SetUpServer(&service, nullptr, &generic_service, nullptr); ResetStub(); std::thread generic_handler_thread(HandleGenericCall, &generic_service, cqs_[0].get()); @@ -484,7 +506,7 @@ TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) { EchoTestService::WithAsyncMethod_Echo> SType; SType service; - SetUpServer(&service, nullptr, nullptr); + SetUpServer(&service, nullptr, nullptr, nullptr); ResetStub(); std::thread echo_handler_thread(HandleEcho, &service, cqs_[0].get(), false); @@ -500,7 +522,7 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) { EchoTestService::WithAsyncMethod_ResponseStream> SType; SType service; - SetUpServer(&service, nullptr, nullptr); + SetUpServer(&service, nullptr, nullptr, nullptr); ResetStub(); std::thread response_stream_handler_thread(HandleServerStreaming, &service, cqs_[0].get()); @@ -518,7 +540,7 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) { SType; SType service; TestServiceImplDupPkg dup_service; - SetUpServer(&service, &dup_service, nullptr); + SetUpServer(&service, &dup_service, nullptr, nullptr); ResetStub(); std::thread response_stream_handler_thread(HandleServerStreaming, &service, cqs_[0].get()); @@ -557,7 +579,7 @@ TEST_F(HybridEnd2endTest, SType; SType service; StreamedUnaryDupPkg dup_service; - SetUpServer(&service, &dup_service, nullptr, 8192); + SetUpServer(&service, &dup_service, nullptr, nullptr, 8192); ResetStub(); std::thread response_stream_handler_thread(HandleServerStreaming, &service, cqs_[0].get()); @@ -595,7 +617,7 @@ TEST_F(HybridEnd2endTest, SType; SType service; FullyStreamedUnaryDupPkg dup_service; - SetUpServer(&service, &dup_service, nullptr, 8192); + SetUpServer(&service, &dup_service, nullptr, nullptr, 8192); ResetStub(); std::thread response_stream_handler_thread(HandleServerStreaming, &service, cqs_[0].get()); @@ -636,7 +658,7 @@ TEST_F(HybridEnd2endTest, SType; SType service; SplitResponseStreamDupPkg dup_service; - SetUpServer(&service, &dup_service, nullptr, 8192); + SetUpServer(&service, &dup_service, nullptr, nullptr, 8192); ResetStub(); std::thread response_stream_handler_thread(HandleServerStreaming, &service, cqs_[0].get()); @@ -676,7 +698,7 @@ TEST_F(HybridEnd2endTest, SType; SType service; FullySplitStreamedDupPkg dup_service; - SetUpServer(&service, &dup_service, nullptr, 8192); + SetUpServer(&service, &dup_service, nullptr, nullptr, 8192); ResetStub(); std::thread response_stream_handler_thread(HandleServerStreaming, &service, cqs_[0].get()); @@ -728,7 +750,7 @@ TEST_F(HybridEnd2endTest, SType; SType service; FullyStreamedDupPkg dup_service; - SetUpServer(&service, &dup_service, nullptr, 8192); + SetUpServer(&service, &dup_service, nullptr, nullptr, 8192); ResetStub(); std::thread response_stream_handler_thread(HandleServerStreaming, &service, cqs_[0].get()); @@ -748,7 +770,7 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) { SType; SType service; duplicate::EchoTestService::AsyncService dup_service; - SetUpServer(&service, &dup_service, nullptr); + SetUpServer(&service, &dup_service, nullptr, nullptr); ResetStub(); std::thread response_stream_handler_thread(HandleServerStreaming, &service, cqs_[0].get()); @@ -767,7 +789,7 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) { TEST_F(HybridEnd2endTest, GenericEcho) { EchoTestService::WithGenericMethod_Echo service; AsyncGenericService generic_service; - SetUpServer(&service, nullptr, &generic_service); + SetUpServer(&service, nullptr, &generic_service, nullptr); ResetStub(); std::thread generic_handler_thread(HandleGenericCall, &generic_service, cqs_[0].get()); @@ -775,13 +797,56 @@ TEST_F(HybridEnd2endTest, GenericEcho) { generic_handler_thread.join(); } +TEST_P(HybridEnd2endTest, CallbackGenericEcho) { + EchoTestService::WithGenericMethod_Echo service; + class GenericEchoService : public experimental::CallbackGenericService { + private: + experimental::ServerGenericBidiReactor* CreateReactor() override { + class Reactor : public experimental::ServerGenericBidiReactor { + private: + void OnStarted(GenericServerContext* ctx) override { + ctx_ = ctx; + EXPECT_EQ(ctx->method(), "/grpc.testing.EchoTestService/Echo"); + StartRead(&request_); + } + void OnDone() override { delete this; } + void OnReadDone(bool ok) override { + if (!ok) { + EXPECT_EQ(reads_complete_, 1); + } else { + EXPECT_EQ(reads_complete_++, 0); + response_ = request_; + StartWrite(&response_); + StartRead(&request_); + } + } + void OnWriteDone(bool ok) override { + Finish(ok ? Status::OK + : Status(StatusCode::UNKNOWN, "Unexpected failure")); + } + GenericServerContext* ctx_; + ByteBuffer request_; + ByteBuffer response_; + std::atomic_int reads_complete_{0}; + }; + return new Reactor; + } + } generic_service; + + if (!SetUpServer(&service, nullptr, nullptr, &generic_service)) { + return; + } + ResetStub(); + TestAllMethods(); +} + TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) { typedef EchoTestService::WithAsyncMethod_RequestStream< EchoTestService::WithGenericMethod_Echo> SType; SType service; AsyncGenericService generic_service; - SetUpServer(&service, nullptr, &generic_service); + SetUpServer(&service, nullptr, &generic_service, nullptr); ResetStub(); std::thread generic_handler_thread(HandleGenericCall, &generic_service, cqs_[0].get()); @@ -800,7 +865,7 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_SyncDupService) { SType service; AsyncGenericService generic_service; TestServiceImplDupPkg dup_service; - SetUpServer(&service, &dup_service, &generic_service); + SetUpServer(&service, &dup_service, &generic_service, nullptr); ResetStub(); std::thread generic_handler_thread(HandleGenericCall, &generic_service, cqs_[0].get()); @@ -820,7 +885,7 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream_AsyncDupService) { SType service; AsyncGenericService generic_service; duplicate::EchoTestService::AsyncService dup_service; - SetUpServer(&service, &dup_service, &generic_service); + SetUpServer(&service, &dup_service, &generic_service, nullptr); ResetStub(); std::thread generic_handler_thread(HandleGenericCall, &generic_service, cqs_[0].get()); @@ -843,7 +908,7 @@ TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) { SType; SType service; AsyncGenericService generic_service; - SetUpServer(&service, nullptr, &generic_service); + SetUpServer(&service, nullptr, &generic_service, nullptr); ResetStub(); std::thread generic_handler_thread(HandleGenericCall, &generic_service, cqs_[0].get()); @@ -864,7 +929,7 @@ TEST_F(HybridEnd2endTest, GenericEchoRequestStreamAsyncResponseStream) { SType; SType service; AsyncGenericService generic_service; - SetUpServer(&service, nullptr, &generic_service); + SetUpServer(&service, nullptr, &generic_service, nullptr); ResetStub(); std::thread generic_handler_thread(HandleGenericCall, &generic_service, cqs_[0].get()); @@ -885,10 +950,13 @@ TEST_F(HybridEnd2endTest, GenericMethodWithoutGenericService) { EchoTestService::WithGenericMethod_Echo< EchoTestService::WithAsyncMethod_ResponseStream>> service; - SetUpServer(&service, nullptr, nullptr); + SetUpServer(&service, nullptr, nullptr, nullptr); EXPECT_EQ(nullptr, server_.get()); } +INSTANTIATE_TEST_CASE_P(HybridEnd2endTest, HybridEnd2endTest, + ::testing::Bool()); + } // namespace } // namespace testing } // namespace grpc