Exploratory refactoring for mixed sync/async methods on the same C++ service

pull/4638/head
Craig Tiller 9 years ago
parent 0850640213
commit 15f383c6cc
  1. 203
      include/grpc++/impl/method_handler_impl.h
  2. 186
      include/grpc++/impl/rpc_service_method.h
  3. 78
      include/grpc++/impl/service_type.h
  4. 27
      include/grpc++/server.h
  5. 35
      include/grpc++/server_builder.h
  6. 189
      src/compiler/cpp_generator.cc
  7. 39
      src/cpp/server/server.cc
  8. 45
      src/cpp/server/server_builder.cc
  9. 12
      test/cpp/end2end/async_end2end_test.cc
  10. 2
      test/cpp/qps/server_async.cc

@ -0,0 +1,203 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_IMPL_METHOD_HANDLER_IMPL_H
#define GRPCXX_IMPL_METHOD_HANDLER_IMPL_H
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/support/sync_stream.h>
namespace grpc {
// A wrapper class of an application provided rpc method handler.
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler : public MethodHandler {
public:
RpcMethodHandler(
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ResponseType*)> func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
RequestType req;
Status status = SerializationTraits<RequestType>::Deserialize(
param.request, &req, param.max_message_size);
ResponseType rsp;
if (status.ok()) {
status = func_(service_, param.server_context, &req, &rsp);
}
GPR_ASSERT(!param.server_context->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus> ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (status.ok()) {
status = ops.SendMessage(rsp);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
// Application provided rpc handler function.
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ResponseType*)> func_;
// The class the above handler function lives in.
ServiceType* service_;
};
// A wrapper class of an application provided client streaming handler.
template <class ServiceType, class RequestType, class ResponseType>
class ClientStreamingHandler : public MethodHandler {
public:
ClientStreamingHandler(
std::function<Status(ServiceType*, ServerContext*,
ServerReader<RequestType>*, ResponseType*)> func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReader<RequestType> reader(param.call, param.server_context);
ResponseType rsp;
Status status = func_(service_, param.server_context, &reader, &rsp);
GPR_ASSERT(!param.server_context->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus> ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (status.ok()) {
status = ops.SendMessage(rsp);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
ResponseType*)> func_;
ServiceType* service_;
};
// A wrapper class of an application provided server streaming handler.
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler : public MethodHandler {
public:
ServerStreamingHandler(
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ServerWriter<ResponseType>*)> func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
RequestType req;
Status status = SerializationTraits<RequestType>::Deserialize(
param.request, &req, param.max_message_size);
if (status.ok()) {
ServerWriter<ResponseType> writer(param.call, param.server_context);
status = func_(service_, param.server_context, &req, &writer);
}
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ServerWriter<ResponseType>*)> func_;
ServiceType* service_;
};
// A wrapper class of an application provided bidi-streaming handler.
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler : public MethodHandler {
public:
BidiStreamingHandler(
std::function<Status(ServiceType*, ServerContext*,
ServerReaderWriter<ResponseType, RequestType>*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReaderWriter<ResponseType, RequestType> stream(param.call,
param.server_context);
Status status = func_(service_, param.server_context, &stream);
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
std::function<Status(ServiceType*, ServerContext*,
ServerReaderWriter<ResponseType, RequestType>*)> func_;
ServiceType* service_;
};
// Handle unknown method by returning UNIMPLEMENTED error.
class UnknownMethodHandler : public MethodHandler {
public:
template <class T>
static void FillOps(ServerContext* context, T* ops) {
Status status(StatusCode::UNIMPLEMENTED, "");
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_);
context->sent_initial_metadata_ = true;
}
ops->ServerSendStatus(context->trailing_metadata_, status);
}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
FillOps(param.server_context, &ops);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
};
} // namespace grpc
#endif // GRPCXX_IMPL_METHOD_HANDLER_IMPL_H

@ -43,7 +43,6 @@
#include <grpc++/impl/rpc_method.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
#include <grpc++/support/sync_stream.h>
namespace grpc {
class ServerContext;
@ -71,197 +70,24 @@ class MethodHandler {
virtual void RunHandler(const HandlerParameter& param) = 0;
};
// A wrapper class of an application provided rpc method handler.
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler : public MethodHandler {
public:
RpcMethodHandler(
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ResponseType*)> func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
RequestType req;
Status status = SerializationTraits<RequestType>::Deserialize(
param.request, &req, param.max_message_size);
ResponseType rsp;
if (status.ok()) {
status = func_(service_, param.server_context, &req, &rsp);
}
GPR_ASSERT(!param.server_context->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus> ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (status.ok()) {
status = ops.SendMessage(rsp);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
// Application provided rpc handler function.
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ResponseType*)> func_;
// The class the above handler function lives in.
ServiceType* service_;
};
// A wrapper class of an application provided client streaming handler.
template <class ServiceType, class RequestType, class ResponseType>
class ClientStreamingHandler : public MethodHandler {
public:
ClientStreamingHandler(
std::function<Status(ServiceType*, ServerContext*,
ServerReader<RequestType>*, ResponseType*)> func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReader<RequestType> reader(param.call, param.server_context);
ResponseType rsp;
Status status = func_(service_, param.server_context, &reader, &rsp);
GPR_ASSERT(!param.server_context->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus> ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (status.ok()) {
status = ops.SendMessage(rsp);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
ResponseType*)> func_;
ServiceType* service_;
};
// A wrapper class of an application provided server streaming handler.
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler : public MethodHandler {
public:
ServerStreamingHandler(
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ServerWriter<ResponseType>*)> func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
RequestType req;
Status status = SerializationTraits<RequestType>::Deserialize(
param.request, &req, param.max_message_size);
if (status.ok()) {
ServerWriter<ResponseType> writer(param.call, param.server_context);
status = func_(service_, param.server_context, &req, &writer);
}
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ServerWriter<ResponseType>*)> func_;
ServiceType* service_;
};
// A wrapper class of an application provided bidi-streaming handler.
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler : public MethodHandler {
public:
BidiStreamingHandler(
std::function<Status(ServiceType*, ServerContext*,
ServerReaderWriter<ResponseType, RequestType>*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReaderWriter<ResponseType, RequestType> stream(param.call,
param.server_context);
Status status = func_(service_, param.server_context, &stream);
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
std::function<Status(ServiceType*, ServerContext*,
ServerReaderWriter<ResponseType, RequestType>*)> func_;
ServiceType* service_;
};
// Handle unknown method by returning UNIMPLEMENTED error.
class UnknownMethodHandler : public MethodHandler {
public:
template <class T>
static void FillOps(ServerContext* context, T* ops) {
Status status(StatusCode::UNIMPLEMENTED, "");
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_);
context->sent_initial_metadata_ = true;
}
ops->ServerSendStatus(context->trailing_metadata_, status);
}
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
FillOps(param.server_context, &ops);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
};
// Server side rpc method class
class RpcServiceMethod : public RpcMethod {
public:
// Takes ownership of the handler
RpcServiceMethod(const char* name, RpcMethod::RpcType type,
MethodHandler* handler)
: RpcMethod(name, type), handler_(handler) {}
: RpcMethod(name, type), server_tag_(nullptr), handler_(handler) {}
MethodHandler* handler() { return handler_.get(); }
void set_server_tag(void* tag) { server_tag_ = tag; }
void* server_tag() const { return server_tag_; }
// if MethodHandler is nullptr, then this is an async method
MethodHandler* handler() const { return handler_.get(); }
private:
void* server_tag_;
std::unique_ptr<MethodHandler> handler_;
};
// This class contains all the method information for an rpc service. It is
// used for registering a service on a grpc server.
class RpcService {
public:
// Takes ownership.
void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); }
int GetMethodCount() const {
// On win x64, int is only 32bit
GPR_ASSERT(methods_.size() <= INT_MAX);
return (int)methods_.size();
}
private:
std::vector<std::unique_ptr<RpcServiceMethod>> methods_;
};
} // namespace grpc
#endif // GRPCXX_IMPL_RPC_SERVICE_METHOD_H

@ -34,6 +34,7 @@
#ifndef GRPCXX_IMPL_SERVICE_TYPE_H
#define GRPCXX_IMPL_SERVICE_TYPE_H
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/server.h>
#include <grpc++/support/config.h>
@ -43,17 +44,10 @@ namespace grpc {
class Call;
class CompletionQueue;
class RpcService;
class Server;
class ServerCompletionQueue;
class ServerContext;
class SynchronousService {
public:
virtual ~SynchronousService() {}
virtual RpcService* service() = 0;
};
class ServerAsyncStreamingInterface {
public:
virtual ~ServerAsyncStreamingInterface() {}
@ -65,15 +59,27 @@ class ServerAsyncStreamingInterface {
virtual void BindCall(Call* call) = 0;
};
class AsynchronousService {
class Service {
public:
AsynchronousService(const char** method_names, size_t method_count)
: server_(nullptr),
method_names_(method_names),
method_count_(method_count),
request_args_(nullptr) {}
virtual ~Service() {}
~AsynchronousService() { delete[] request_args_; }
bool has_async_methods() const {
for (auto it = methods_.begin(); it != methods_.end(); ++it) {
if ((*it)->handler() == nullptr) {
return true;
}
}
return false;
}
bool has_synchronous_methods() const {
for (auto it = methods_.begin(); it != methods_.end(); ++it) {
if ((*it)->handler() != nullptr) {
return true;
}
}
return false;
}
protected:
template <class Message>
@ -81,41 +87,41 @@ class AsynchronousService {
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag, request);
}
void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
void RequestAsyncClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag);
}
template <class Message>
void RequestServerStreaming(int index, ServerContext* context,
Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
void RequestAsyncServerStreaming(int index, ServerContext* context,
Message* request,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag, request);
}
void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncCall(request_args_[index], context, stream, call_cq,
void RequestAsyncBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag);
}
private:
friend class Server;
Server* server_;
const char** const method_names_;
size_t method_count_;
void** request_args_;
std::vector<std::unique_ptr<RpcServiceMethod>> methods_;
};
} // namespace grpc

@ -40,6 +40,7 @@
#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/sync.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/support/channel_arguments.h>
@ -51,13 +52,11 @@ struct grpc_server;
namespace grpc {
class AsynchronousService;
class GenericServerContext;
class AsyncGenericService;
class RpcService;
class RpcServiceMethod;
class ServerAsyncStreamingInterface;
class ServerContext;
class Service;
class ThreadPoolInterface;
/// Models a gRPC server.
@ -105,7 +104,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
private:
friend class AsyncGenericService;
friend class AsynchronousService;
friend class Service;
friend class ServerBuilder;
class SyncRequest;
@ -123,12 +122,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
bool RegisterService(const grpc::string* host, RpcService* service);
/// Register an asynchronous service. This call does not take ownership of the
/// service. The service must exist for the lifetime of the Server instance.
bool RegisterAsyncService(const grpc::string* host,
AsynchronousService* service);
bool RegisterService(const grpc::string* host, Service* service);
/// Register a generic service. This call does not take ownership of the
/// service. The service must exist for the lifetime of the Server instance.
@ -265,21 +259,22 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
class UnimplementedAsyncResponse;
template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context,
void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
call_cq, notification_cq, tag, message);
new PayloadAsyncRequest<Message>(method->server_tag(), this, context,
stream, call_cq, notification_cq, tag,
message);
}
void RequestAsyncCall(void* registered_method, ServerContext* context,
void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
notification_cq, tag);
new NoPayloadAsyncRequest(method->server_tag(), this, context, stream,
call_cq, notification_cq, tag);
}
void RequestAsyncGenericCall(GenericServerContext* context,

@ -44,14 +44,12 @@
namespace grpc {
class AsyncGenericService;
class AsynchronousService;
class CompletionQueue;
class RpcService;
class Server;
class ServerCompletionQueue;
class ServerCredentials;
class SynchronousService;
class ThreadPoolInterface;
class Service;
/// A builder class for the creation and startup of \a grpc::Server instances.
class ServerBuilder {
@ -62,14 +60,7 @@ class ServerBuilder {
/// The service must exist for the lifetime of the \a Server instance returned
/// by \a BuildAndStart().
/// Matches requests with any :authority
void RegisterService(SynchronousService* service);
/// Register an asynchronous service.
/// This call does not take ownership of the service or completion queue.
/// The service and completion queuemust exist for the lifetime of the \a
/// Server instance returned by \a BuildAndStart().
/// Matches requests with any :authority
void RegisterAsyncService(AsynchronousService* service);
void RegisterService(Service* service);
/// Register a generic service.
/// Matches requests with any :authority
@ -79,15 +70,7 @@ class ServerBuilder {
/// The service must exist for the lifetime of the \a Server instance returned
/// by BuildAndStart().
/// Only matches requests with :authority \a host
void RegisterService(const grpc::string& host, SynchronousService* service);
/// Register an asynchronous service.
/// This call does not take ownership of the service or completion queue.
/// The service and completion queuemust exist for the lifetime of the \a
/// Server instance returned by \a BuildAndStart().
/// Only matches requests with :authority equal to \a host
void RegisterAsyncService(const grpc::string& host,
AsynchronousService* service);
void RegisterService(const grpc::string& host, Service* service);
/// Set max message size in bytes.
void SetMaxMessageSize(int max_message_size) {
@ -132,26 +115,22 @@ class ServerBuilder {
};
typedef std::unique_ptr<grpc::string> HostString;
template <class T>
struct NamedService {
explicit NamedService(T* s) : service(s) {}
NamedService(const grpc::string& h, T* s)
explicit NamedService(Service* s) : service(s) {}
NamedService(const grpc::string& h, Service* s)
: host(new grpc::string(h)), service(s) {}
HostString host;
T* service;
Service* service;
};
int max_message_size_;
grpc_compression_options compression_options_;
std::vector<std::unique_ptr<ServerBuilderOption>> options_;
std::vector<std::unique_ptr<NamedService<RpcService>>> services_;
std::vector<std::unique_ptr<NamedService<AsynchronousService>>>
async_services_;
std::vector<std::unique_ptr<NamedService>> services_;
std::vector<Port> ports_;
std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_;
AsyncGenericService* generic_service_;
ThreadPoolInterface* thread_pool_;
};
} // namespace grpc

@ -491,39 +491,109 @@ void PrintHeaderServerMethodAsync(
grpc_cpp_generator::ClassName(method->input_type(), true);
(*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true);
printer->Print(*vars, "template <class BaseClass>\n");
printer->Print(*vars,
"class WithAsyncMethod_$Method$ : public BaseClass {\n");
printer->Print(
" private:\n"
" void BaseClassMustBeDerivedFromService(Service *service) {}\n");
printer->Print(" public:\n");
printer->Indent();
printer->Print(*vars,
"~WithAsyncMethod_$Method$() {\n"
" BaseClassMustBeDerivedFromService(this);\n"
"}\n");
if (NoStreaming(method)) {
printer->Print(
*vars,
"// disable synchronous version of this method\n"
"::grpc::Status $Method$("
"::grpc::ServerContext* context, const $Request$* request, "
"$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n"
" abort();\n"
" return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
"}\n");
printer->Print(
*vars,
"void Request$Method$("
"::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars,
" ::grpc::Service::RequestAsyncUnary($Idx$, context, "
"request, response, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
"// disable synchronous version of this method\n"
"::grpc::Status $Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerReader< $Request$>* reader, "
"$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n"
" abort();\n"
" return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
"}\n");
printer->Print(
*vars,
"void Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars,
" ::grpc::Service::RequestAsyncClientStreaming($Idx$, "
"context, reader, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
"// disable synchronous version of this method\n"
"::grpc::Status $Method$("
"::grpc::ServerContext* context, const $Request$* request, "
"::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE "
"{\n"
" abort();\n"
" return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
"}\n");
printer->Print(
*vars,
"void Request$Method$("
"::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncWriter< $Response$>* writer, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(
*vars,
" ::grpc::Service::RequestAsyncServerStreaming($Idx$, "
"context, request, writer, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
"// disable synchronous version of this method\n"
"::grpc::Status $Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerReaderWriter< $Response$, $Request$>* stream) "
"GRPC_FINAL GRPC_OVERRIDE {\n"
" abort();\n"
" return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
"}\n");
printer->Print(
*vars,
"void Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars,
" ::grpc::Service::RequestAsyncBidiStreaming($Idx$, "
"context, stream, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n");
}
printer->Outdent();
printer->Print(*vars, "};\n");
}
void PrintHeaderService(grpc::protobuf::io::Printer *printer,
@ -580,9 +650,9 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
printer->Print("\n");
// Server side - Synchronous
// Server side - base
printer->Print(
"class Service : public ::grpc::SynchronousService {\n"
"class Service : public ::grpc::Service {\n"
" public:\n");
printer->Indent();
printer->Print("Service();\n");
@ -590,26 +660,26 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderServerMethodSync(printer, service->method(i), vars);
}
printer->Print("::grpc::RpcService* service() GRPC_OVERRIDE GRPC_FINAL;\n");
printer->Outdent();
printer->Print(
" private:\n"
" std::unique_ptr< ::grpc::RpcService> service_;\n");
printer->Print("};\n");
// Server side - Asynchronous
printer->Print(
"class AsyncService GRPC_FINAL : public ::grpc::AsynchronousService {\n"
" public:\n");
printer->Indent();
(*vars)["MethodCount"] = as_string(service->method_count());
printer->Print("explicit AsyncService();\n");
printer->Print("~AsyncService() {};\n");
for (int i = 0; i < service->method_count(); ++i) {
(*vars)["Idx"] = as_string(i);
PrintHeaderServerMethodAsync(printer, service->method(i), vars);
}
printer->Outdent();
printer->Print("};\n");
printer->Print("typedef ");
for (int i = 0; i < service->method_count(); ++i) {
(*vars)["method_name"] = service->method(i)->name();
printer->Print(*vars, "WithAsyncMethod_$method_name$<");
}
printer->Print("Service");
for (int i = 0; i < service->method_count(); ++i) {
printer->Print(" >");
}
printer->Print(" AsyncService;\n");
printer->Outdent();
printer->Print("};\n");
@ -889,69 +959,6 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
}
}
void PrintSourceServerAsyncMethod(
grpc::protobuf::io::Printer *printer,
const grpc::protobuf::MethodDescriptor *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] =
grpc_cpp_generator::ClassName(method->input_type(), true);
(*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) {
printer->Print(
*vars,
"void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, "
"$Request$* request, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars,
" AsynchronousService::RequestAsyncUnary($Idx$, context, "
"request, response, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
"void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars,
" AsynchronousService::RequestClientStreaming($Idx$, "
"context, reader, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
"void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, "
"$Request$* request, "
"::grpc::ServerAsyncWriter< $Response$>* writer, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(
*vars,
" AsynchronousService::RequestServerStreaming($Idx$, "
"context, request, writer, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
"void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars,
" AsynchronousService::RequestBidiStreaming($Idx$, "
"context, stream, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n");
}
}
void PrintSourceService(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::ServiceDescriptor *service,
std::map<grpc::string, grpc::string> *vars) {
@ -1006,13 +1013,6 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
PrintSourceClientMethod(printer, service->method(i), vars);
}
(*vars)["MethodCount"] = as_string(service->method_count());
printer->Print(*vars,
"$ns$$Service$::AsyncService::AsyncService() : "
"::grpc::AsynchronousService("
"$prefix$$Service$_method_names, $MethodCount$) "
"{}\n\n");
printer->Print(*vars,
"$ns$$Service$::Service::Service() {\n"
"}\n\n");
@ -1022,15 +1022,9 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
for (int i = 0; i < service->method_count(); ++i) {
(*vars)["Idx"] = as_string(i);
PrintSourceServerMethod(printer, service->method(i), vars);
PrintSourceServerAsyncMethod(printer, service->method(i), vars);
}
printer->Print(*vars,
"::grpc::RpcService* $ns$$Service$::Service::service() {\n");
printer->Indent();
printer->Print(
"if (service_) {\n"
" return service_.get();\n"
"}\n");
#if 0
printer->Print("service_ = std::unique_ptr< ::grpc::RpcService>(new ::grpc::RpcService());\n");
for (int i = 0; i < service->method_count(); ++i) {
const grpc::protobuf::MethodDescriptor *method = service->method(i);
@ -1082,6 +1076,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
printer->Print("return service_.get();\n");
printer->Outdent();
printer->Print("}\n\n");
#endif
}
grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file,

@ -40,6 +40,7 @@
#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
#include <grpc++/generic/async_generic_service.h>
#include <grpc++/impl/method_handler_impl.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server_context.h>
@ -314,36 +315,28 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
g_callbacks = callbacks;
}
bool Server::RegisterService(const grpc::string* host, RpcService* service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod* method = service->GetMethod(i);
bool Server::RegisterService(const grpc::string* host, Service* service) {
bool has_async_methods = service->has_async_methods();
if (has_async_methods) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->server_ = this;
}
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(server_, method->name(),
host ? host->c_str() : nullptr);
if (!tag) {
if (tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
sync_methods_->emplace_back(method, tag);
}
return true;
}
bool Server::RegisterAsyncService(const grpc::string* host,
AsynchronousService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->server_ = this;
service->request_args_ = new void* [service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag = grpc_server_register_method(server_, service->method_names_[i],
host ? host->c_str() : nullptr);
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
service->method_names_[i]);
return false;
if (method->handler() == nullptr) {
method->set_server_tag(tag);
} else {
sync_methods_->emplace_back(method, tag);
}
service->request_args_[i] = tag;
}
return true;
}

@ -43,7 +43,7 @@
namespace grpc {
ServerBuilder::ServerBuilder()
: max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {
: max_message_size_(-1), generic_service_(nullptr) {
grpc_compression_options_init(&compression_options_);
}
@ -53,24 +53,13 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
return std::unique_ptr<ServerCompletionQueue>(cq);
}
void ServerBuilder::RegisterService(SynchronousService* service) {
services_.emplace_back(new NamedService<RpcService>(service->service()));
}
void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
async_services_.emplace_back(new NamedService<AsynchronousService>(service));
void ServerBuilder::RegisterService(Service* service) {
services_.emplace_back(new NamedService(service));
}
void ServerBuilder::RegisterService(const grpc::string& addr,
SynchronousService* service) {
services_.emplace_back(
new NamedService<RpcService>(addr, service->service()));
}
void ServerBuilder::RegisterAsyncService(const grpc::string& addr,
AsynchronousService* service) {
async_services_.emplace_back(
new NamedService<AsynchronousService>(addr, service));
Service* service) {
services_.emplace_back(new NamedService(addr, service));
}
void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
@ -96,14 +85,13 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr,
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
bool thread_pool_owned = false;
if (!async_services_.empty() && !services_.empty()) {
gpr_log(GPR_ERROR, "Mixing async and sync services is unsupported for now");
return nullptr;
}
if (!thread_pool_ && !services_.empty()) {
thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true;
std::unique_ptr<ThreadPoolInterface> thread_pool;
for (auto it = services_.begin(); it != services_.end(); ++it) {
if ((*it)->service->has_synchronous_methods()) {
if (thread_pool == nullptr && !services_.empty()) {
thread_pool.reset(CreateDefaultThreadPool());
}
}
}
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
@ -115,7 +103,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_, args));
new Server(thread_pool.release(), true, max_message_size_, args));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
@ -126,13 +114,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
for (auto service = async_services_.begin(); service != async_services_.end();
service++) {
if (!server->RegisterAsyncService((*service)->host.get(),
(*service)->service)) {
return nullptr;
}
}
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
}

@ -180,21 +180,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// It is currently unsupported to mix sync and async services
// in the same server, so first test that (for coverage)
ServerBuilder build_bad;
build_bad.AddListeningPort(server_address_.str(),
grpc::InsecureServerCredentials());
build_bad.RegisterAsyncService(&service_);
grpc::cpp::test::util::TestService::Service sync_service;
build_bad.RegisterService(&sync_service);
GPR_ASSERT(build_bad.BuildAndStart() == nullptr);
// Setup server
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(),
grpc::InsecureServerCredentials());
builder.RegisterAsyncService(&service_);
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
}

@ -67,7 +67,7 @@ class AsyncQpsServerTest : public Server {
Server::CreateServerCredentials(config));
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
builder.RegisterService(&async_service_);
for (int i = 0; i < config.async_server_threads(); i++) {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}

Loading…
Cancel
Save