Support sync/async methods in the same service

pull/4714/head
yang-g 9 years ago
parent dc548ed65d
commit bef0d8744e
  1. 3
      include/grpc++/impl/rpc_service_method.h
  2. 13
      include/grpc++/impl/service_type.h
  3. 48
      src/compiler/cpp_generator.cc
  4. 3
      src/cpp/server/server_builder.cc

@ -48,8 +48,6 @@ namespace grpc {
class ServerContext;
class StreamContextInterface;
// TODO(rocking): we might need to split this file into multiple ones.
// Base class for running an RPC handler.
class MethodHandler {
public:
@ -82,6 +80,7 @@ class RpcServiceMethod : public RpcMethod {
void* server_tag() const { return server_tag_; }
// if MethodHandler is nullptr, then this is an async method
MethodHandler* handler() const { return handler_.get(); }
void ResetHandler() { handler_.reset(); }
private:
void* server_tag_;

@ -61,6 +61,7 @@ class ServerAsyncStreamingInterface {
class Service {
public:
Service() : server_(nullptr) {}
virtual ~Service() {}
bool has_async_methods() const {
@ -117,6 +118,18 @@ class Service {
notification_cq, tag);
}
void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
void MarkMethodAsync(const grpc::string& method_name) {
for (auto it = methods_.begin(); it != methods_.end(); ++it) {
if ((*it)->name() == method_name) {
(*it)->ResetHandler();
return;
}
}
abort();
}
private:
friend class Server;

@ -500,7 +500,12 @@ void PrintHeaderServerMethodAsync(
printer->Print(" public:\n");
printer->Indent();
printer->Print(*vars,
"~WithAsyncMethod_$Method$() {\n"
"WithAsyncMethod_$Method$() {\n"
" ::grpc::Service::MarkMethodAsync("
"\"/$Package$$Service$/$Method$\");\n"
"}\n");
printer->Print(*vars,
"~WithAsyncMethod_$Method$() GRPC_OVERRIDE {\n"
" BaseClassMustBeDerivedFromService(this);\n"
"}\n");
if (NoStreaming(method)) {
@ -693,6 +698,12 @@ grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file,
grpc::protobuf::io::StringOutputStream output_stream(&output);
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
// Package string is empty or ends with a dot. It is used to fully qualify
// method names.
vars["Package"] = file->package();
if (!file->package().empty()) {
vars["Package"].append(".");
}
if (!params.services_namespace.empty()) {
vars["services_namespace"] = params.services_namespace;
@ -774,6 +785,7 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file,
printer.Print(vars, "#include <grpc++/channel.h>\n");
printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n");
printer.Print(vars, "#include <grpc++/impl/method_handler_impl.h>\n");
printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n");
printer.Print(vars, "#include <grpc++/impl/service_type.h>\n");
printer.Print(vars, "#include <grpc++/support/async_unary_call.h>\n");
@ -1013,19 +1025,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
PrintSourceClientMethod(printer, service->method(i), vars);
}
printer->Print(*vars,
"$ns$$Service$::Service::Service() {\n"
"}\n\n");
printer->Print(*vars,
"$ns$$Service$::Service::~Service() {\n"
"}\n\n");
for (int i = 0; i < service->method_count(); ++i) {
(*vars)["Idx"] = as_string(i);
PrintSourceServerMethod(printer, service->method(i), vars);
}
#if 0
printer->Print("service_ = std::unique_ptr< ::grpc::RpcService>(new ::grpc::RpcService());\n");
printer->Print(*vars, "$ns$$Service$::Service::Service() {\n");
printer->Indent();
for (int i = 0; i < service->method_count(); ++i) {
const grpc::protobuf::MethodDescriptor *method = service->method(i);
(*vars)["Idx"] = as_string(i);
@ -1037,7 +1038,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
if (NoStreaming(method)) {
printer->Print(
*vars,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
"AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::NORMAL_RPC,\n"
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
@ -1047,7 +1048,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
"AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< "
@ -1056,7 +1057,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
"AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< "
@ -1065,7 +1066,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
"AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< "
@ -1073,10 +1074,15 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
}
printer->Print("return service_.get();\n");
printer->Outdent();
printer->Print("}\n\n");
#endif
printer->Print(*vars, "}\n\n");
printer->Print(*vars,
"$ns$$Service$::Service::~Service() {\n"
"}\n\n");
for (int i = 0; i < service->method_count(); ++i) {
(*vars)["Idx"] = as_string(i);
PrintSourceServerMethod(printer, service->method(i), vars);
}
}
grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file,

@ -88,8 +88,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<ThreadPoolInterface> thread_pool;
for (auto it = services_.begin(); it != services_.end(); ++it) {
if ((*it)->service->has_synchronous_methods()) {
if (thread_pool == nullptr && !services_.empty()) {
if (thread_pool == nullptr) {
thread_pool.reset(CreateDefaultThreadPool());
break;
}
}
}

Loading…
Cancel
Save