From 04af168cf852236f71d08f34d45442725567b9c6 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Wed, 20 Mar 2019 13:05:36 -0700 Subject: [PATCH 1/5] Move Server into grpc_impl from grpc --- BUILD | 1 + .../impl/codegen/async_generic_service.h | 10 +- include/grpcpp/impl/codegen/async_stream.h | 2 +- .../grpcpp/impl/codegen/completion_queue.h | 9 +- include/grpcpp/impl/codegen/server_context.h | 7 +- include/grpcpp/impl/codegen/service_type.h | 7 +- include/grpcpp/impl/server_initializer.h | 5 +- include/grpcpp/security/server_credentials.h | 7 +- include/grpcpp/server.h | 325 +--------- include/grpcpp/server_builder.h | 7 +- include/grpcpp/server_impl.h | 353 +++++++++++ src/cpp/server/server_cc.cc | 560 +++++++++--------- test/cpp/util/metrics_server.h | 2 + 13 files changed, 677 insertions(+), 618 deletions(-) create mode 100644 include/grpcpp/server_impl.h diff --git a/BUILD b/BUILD index 9e052dcf0c2..ee6e4fce32d 100644 --- a/BUILD +++ b/BUILD @@ -246,6 +246,7 @@ GRPCXX_PUBLIC_HDRS = [ "include/grpcpp/security/credentials.h", "include/grpcpp/security/server_credentials.h", "include/grpcpp/server.h", + "include/grpcpp/server_impl.h", "include/grpcpp/server_builder.h", "include/grpcpp/server_context.h", "include/grpcpp/server_posix.h", diff --git a/include/grpcpp/impl/codegen/async_generic_service.h b/include/grpcpp/impl/codegen/async_generic_service.h index 46489b135d7..4e680bb1c76 100644 --- a/include/grpcpp/impl/codegen/async_generic_service.h +++ b/include/grpcpp/impl/codegen/async_generic_service.h @@ -39,7 +39,7 @@ class GenericServerContext final : public ServerContext { const grpc::string& host() const { return host_; } private: - friend class Server; + friend class grpc_impl::Server; friend class ServerInterface; void Clear() { @@ -79,8 +79,8 @@ class AsyncGenericService final { ServerCompletionQueue* notification_cq, void* tag); private: - friend class Server; - Server* server_; + friend class grpc_impl::Server; + grpc_impl::Server* server_; }; namespace experimental { @@ -117,14 +117,14 @@ class CallbackGenericService { } private: - friend class ::grpc::Server; + friend class ::grpc_impl::Server; internal::CallbackBidiHandler* Handler() { return new internal::CallbackBidiHandler( [this] { return CreateReactor(); }); } - Server* server_{nullptr}; + grpc_impl::Server* server_{nullptr}; }; } // namespace experimental } // namespace grpc diff --git a/include/grpcpp/impl/codegen/async_stream.h b/include/grpcpp/impl/codegen/async_stream.h index bfb2df4f232..e12923ea02c 100644 --- a/include/grpcpp/impl/codegen/async_stream.h +++ b/include/grpcpp/impl/codegen/async_stream.h @@ -1099,7 +1099,7 @@ class ServerAsyncReaderWriter final } private: - friend class ::grpc::Server; + friend class ::grpc_impl::Server; void BindCall(::grpc::internal::Call* call) override { call_ = *call; } diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 4812f0253d4..559b7b7fe47 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -41,6 +41,10 @@ struct grpc_completion_queue; +namespace grpc_impl { + +class Server; +} // namespace grpc_impl namespace grpc { template @@ -62,7 +66,6 @@ class Channel; class ChannelInterface; class ClientContext; class CompletionQueue; -class Server; class ServerBuilder; class ServerContext; class ServerInterface; @@ -271,7 +274,7 @@ class CompletionQueue : private GrpcLibraryCodegen { friend class ::grpc::internal::TemplatedBidiStreamingHandler; template friend class ::grpc::internal::ErrorMethodHandler; - friend class ::grpc::Server; + friend class ::grpc_impl::Server; friend class ::grpc::ServerContext; friend class ::grpc::ServerInterface; template @@ -406,7 +409,7 @@ class ServerCompletionQueue : public CompletionQueue { grpc_cq_polling_type polling_type_; friend class ServerBuilder; - friend class Server; + friend class grpc_impl::Server; }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index 591a9ff9549..c2959d95de7 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -41,11 +41,14 @@ struct grpc_metadata; struct grpc_call; struct census_context; +namespace grpc_impl { + +class Server; +} // namespace grpc_impl namespace grpc { class ClientContext; class GenericServerContext; class CompletionQueue; -class Server; class ServerInterface; template class ServerAsyncReader; @@ -269,7 +272,7 @@ class ServerContext { friend class ::grpc::testing::InteropServerContextInspector; friend class ::grpc::testing::ServerContextTestSpouse; friend class ::grpc::ServerInterface; - friend class ::grpc::Server; + friend class ::grpc_impl::Server; template friend class ::grpc::ServerAsyncReader; template diff --git a/include/grpcpp/impl/codegen/service_type.h b/include/grpcpp/impl/codegen/service_type.h index 332a04c294f..7ac482c19d7 100644 --- a/include/grpcpp/impl/codegen/service_type.h +++ b/include/grpcpp/impl/codegen/service_type.h @@ -26,10 +26,13 @@ #include #include +namespace grpc_impl { + +class Server; +} // namespace grpc_impl namespace grpc { class CompletionQueue; -class Server; class ServerInterface; class ServerCompletionQueue; class ServerContext; @@ -228,7 +231,7 @@ class Service { } private: - friend class Server; + friend class grpc_impl::Server; friend class ServerInterface; ServerInterface* server_; std::vector> methods_; diff --git a/include/grpcpp/impl/server_initializer.h b/include/grpcpp/impl/server_initializer.h index f949fabfe6f..bd18ee04927 100644 --- a/include/grpcpp/impl/server_initializer.h +++ b/include/grpcpp/impl/server_initializer.h @@ -24,9 +24,12 @@ #include -namespace grpc { +namespace grpc_impl { class Server; +} // namespace grpc_impl +namespace grpc { + class Service; class ServerInitializer { diff --git a/include/grpcpp/security/server_credentials.h b/include/grpcpp/security/server_credentials.h index bd00a0a1737..a519caf9913 100644 --- a/include/grpcpp/security/server_credentials.h +++ b/include/grpcpp/security/server_credentials.h @@ -28,8 +28,11 @@ struct grpc_server; -namespace grpc { +namespace grpc_impl { + class Server; +} // namespace grpc_impl +namespace grpc { /// Wrapper around \a grpc_server_credentials, a way to authenticate a server. class ServerCredentials { @@ -42,7 +45,7 @@ class ServerCredentials { const std::shared_ptr& processor) = 0; private: - friend class ::grpc::Server; + friend class ::grpc_impl::Server; /// Tries to bind \a server to the given \a addr (eg, localhost:1234, /// 192.168.1.1:31416, [::1]:27182, etc.) diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index f5c99f22df2..3de2aba0b59 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015 gRPC authors. + * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,330 +19,11 @@ #ifndef GRPCPP_SERVER_H #define GRPCPP_SERVER_H -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -struct grpc_server; +#include namespace grpc { -class AsyncGenericService; -class HealthCheckServiceInterface; -class ServerContext; -class ServerInitializer; - -/// Represents a gRPC server. -/// -/// Use a \a grpc::ServerBuilder to create, configure, and start -/// \a Server instances. -class Server : public ServerInterface, private GrpcLibraryCodegen { - public: - ~Server(); - - /// Block until the server shuts down. - /// - /// \warning The server must be either shutting down or some other thread must - /// call \a Shutdown for this function to ever return. - void Wait() override; - - /// Global callbacks are a set of hooks that are called when server - /// events occur. \a SetGlobalCallbacks method is used to register - /// the hooks with gRPC. Note that - /// the \a GlobalCallbacks instance will be shared among all - /// \a Server instances in an application and can be set exactly - /// once per application. - class GlobalCallbacks { - public: - virtual ~GlobalCallbacks() {} - /// Called before server is created. - virtual void UpdateArguments(ChannelArguments* args) {} - /// Called before application callback for each synchronous server request - virtual void PreSynchronousRequest(ServerContext* context) = 0; - /// Called after application callback for each synchronous server request - virtual void PostSynchronousRequest(ServerContext* context) = 0; - /// Called before server is started. - virtual void PreServerStart(Server* server) {} - /// Called after a server port is added. - virtual void AddPort(Server* server, const grpc::string& addr, - ServerCredentials* creds, int port) {} - }; - /// Set the global callback object. Can only be called once per application. - /// Does not take ownership of callbacks, and expects the pointed to object - /// to be alive until all server objects in the process have been destroyed. - /// The same \a GlobalCallbacks object will be used throughout the - /// application and is shared among all \a Server objects. - static void SetGlobalCallbacks(GlobalCallbacks* callbacks); - - /// Returns a \em raw pointer to the underlying \a grpc_server instance. - /// EXPERIMENTAL: for internal/test use only - grpc_server* c_server(); - - /// Returns the health check service. - HealthCheckServiceInterface* GetHealthCheckService() const { - return health_check_service_.get(); - } - - /// Establish a channel for in-process communication - std::shared_ptr InProcessChannel(const ChannelArguments& args); - - /// NOTE: class experimental_type is not part of the public API of this class. - /// TODO(yashykt): Integrate into public API when this is no longer - /// experimental. - class experimental_type { - public: - explicit experimental_type(Server* server) : server_(server) {} - - /// Establish a channel for in-process communication with client - /// interceptors - std::shared_ptr InProcessChannelWithInterceptors( - const ChannelArguments& args, - std::vector< - std::unique_ptr> - interceptor_creators); - - private: - Server* server_; - }; - - /// NOTE: The function experimental() is not stable public API. It is a view - /// to the experimental components of this class. It may be changed or removed - /// at any time. - experimental_type experimental() { return experimental_type(this); } - - protected: - /// Register a service. This call does not take ownership of the service. - /// The service must exist for the lifetime of the Server instance. - bool RegisterService(const grpc::string* host, Service* service) override; - - /// Try binding the server to the given \a addr endpoint - /// (port, and optionally including IP address to bind to). - /// - /// It can be invoked multiple times. Should be used before - /// starting the server. - /// - /// \param addr The address to try to bind to the server (eg, localhost:1234, - /// 192.168.1.1:31416, [::1]:27182, etc.). - /// \param creds The credentials associated with the server. - /// - /// \return bound port number on success, 0 on failure. - /// - /// \warning It is an error to call this method on an already started server. - int AddListeningPort(const grpc::string& addr, - ServerCredentials* creds) override; - - /// NOTE: This is *NOT* a public API. The server constructors are supposed to - /// be used by \a ServerBuilder class only. The constructor will be made - /// 'private' very soon. - /// - /// Server constructors. To be used by \a ServerBuilder only. - /// - /// \param max_message_size Maximum message length that the channel can - /// receive. - /// - /// \param args The channel args - /// - /// \param sync_server_cqs The completion queues to use if the server is a - /// synchronous server (or a hybrid server). The server polls for new RPCs on - /// these queues - /// - /// \param min_pollers The minimum number of polling threads per server - /// completion queue (in param sync_server_cqs) to use for listening to - /// incoming requests (used only in case of sync server) - /// - /// \param max_pollers The maximum number of polling threads per server - /// completion queue (in param sync_server_cqs) to use for listening to - /// incoming requests (used only in case of sync server) - /// - /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on - /// server completion queues passed via sync_server_cqs param. - Server(int max_message_size, ChannelArguments* args, - std::shared_ptr>> - sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec, - grpc_resource_quota* server_rq = nullptr, - std::vector< - std::unique_ptr> - interceptor_creators = std::vector>()); - - /// Start the server. - /// - /// \param cqs Completion queues for handling asynchronous services. The - /// caller is required to keep all completion queues live until the server is - /// destroyed. - /// \param num_cqs How many completion queues does \a cqs hold. - void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; - - grpc_server* server() override { return server_; } - - private: - std::vector>* - interceptor_creators() override { - return &interceptor_creators_; - } - - friend class AsyncGenericService; - friend class ServerBuilder; - friend class ServerInitializer; - - class SyncRequest; - class CallbackRequestBase; - template - class CallbackRequest; - class UnimplementedAsyncRequest; - class UnimplementedAsyncResponse; - - /// SyncRequestThreadManager is an implementation of ThreadManager. This class - /// is responsible for polling for incoming RPCs and calling the RPC handlers. - /// This is only used in case of a Sync server (i.e a server exposing a sync - /// interface) - class SyncRequestThreadManager; - - /// Register a generic service. This call does not take ownership of the - /// service. The service must exist for the lifetime of the Server instance. - void RegisterAsyncGenericService(AsyncGenericService* service) override; - - /// NOTE: class experimental_registration_type is not part of the public API - /// of this class - /// TODO(vjpai): Move these contents to the public API of Server when - /// they are no longer experimental - class experimental_registration_type final - : public experimental_registration_interface { - public: - explicit experimental_registration_type(Server* server) : server_(server) {} - void RegisterCallbackGenericService( - experimental::CallbackGenericService* service) override { - server_->RegisterCallbackGenericService(service); - } - - private: - Server* server_; - }; - - /// TODO(vjpai): Mark this override when experimental type above is deleted - void RegisterCallbackGenericService( - experimental::CallbackGenericService* service); - - /// NOTE: The function experimental_registration() is not stable public API. - /// It is a view to the experimental components of this class. It may be - /// changed or removed at any time. - experimental_registration_interface* experimental_registration() override { - return &experimental_registration_; - } - - void PerformOpsOnCall(internal::CallOpSetInterface* ops, - internal::Call* call) override; - - void ShutdownInternal(gpr_timespec deadline) override; - - int max_receive_message_size() const override { - return max_receive_message_size_; - } - - CompletionQueue* CallbackCQ() override; - - ServerInitializer* initializer(); - - // A vector of interceptor factory objects. - // This should be destroyed after health_check_service_ and this requirement - // is satisfied by declaring interceptor_creators_ before - // health_check_service_. (C++ mandates that member objects be destroyed in - // the reverse order of initialization.) - std::vector> - interceptor_creators_; - - const int max_receive_message_size_; - - /// The following completion queues are ONLY used in case of Sync API - /// i.e. if the server has any services with sync methods. The server uses - /// these completion queues to poll for new RPCs - std::shared_ptr>> - sync_server_cqs_; - - /// List of \a ThreadManager instances (one for each cq in - /// the \a sync_server_cqs) - std::vector> sync_req_mgrs_; - - // Outstanding unmatched callback requests, indexed by method. - // NOTE: Using a gpr_atm rather than atomic_int because atomic_int isn't - // copyable or movable and thus will cause compilation errors. We - // actually only want to extend the vector before the threaded use - // starts, but this is still a limitation. - std::vector callback_unmatched_reqs_count_; - - // List of callback requests to start when server actually starts. - std::list callback_reqs_to_start_; - - // For registering experimental callback generic service; remove when that - // method longer experimental - experimental_registration_type experimental_registration_{this}; - - // Server status - std::mutex mu_; - bool started_; - bool shutdown_; - bool shutdown_notified_; // Was notify called on the shutdown_cv_ - - std::condition_variable shutdown_cv_; - - // It is ok (but not required) to nest callback_reqs_mu_ under mu_ . - // Incrementing callback_reqs_outstanding_ is ok without a lock but it must be - // decremented under the lock in case it is the last request and enables the - // server shutdown. The increment is performance-critical since it happens - // during periods of increasing load; the decrement happens only when memory - // is maxed out, during server shutdown, or (possibly in a future version) - // during decreasing load, so it is less performance-critical. - std::mutex callback_reqs_mu_; - std::condition_variable callback_reqs_done_cv_; - std::atomic_int callback_reqs_outstanding_{0}; - - std::shared_ptr global_callbacks_; - - std::vector services_; - bool has_async_generic_service_{false}; - bool has_callback_generic_service_{false}; - - // Pointer to the wrapped grpc_server. - grpc_server* server_; - - std::unique_ptr server_initializer_; - - std::unique_ptr health_check_service_; - bool health_check_service_disabled_; - - // When appropriate, use a default callback generic service to handle - // unimplemented methods - std::unique_ptr unimplemented_service_; - - // A special handler for resource exhausted in sync case - std::unique_ptr resource_exhausted_handler_; - - // Handler for callback generic service, if any - std::unique_ptr generic_handler_; - - // callback_cq_ references the callbackable completion queue associated - // with this server (if any). It is set on the first call to CallbackCQ(). - // It is _not owned_ by the server; ownership belongs with its internal - // shutdown callback tag (invoked when the CQ is fully shutdown). - // It is protected by mu_ - CompletionQueue* callback_cq_ = nullptr; -}; +typedef ::grpc_impl::Server Server; } // namespace grpc diff --git a/include/grpcpp/server_builder.h b/include/grpcpp/server_builder.h index 498e5b7bb31..aaa8197e966 100644 --- a/include/grpcpp/server_builder.h +++ b/include/grpcpp/server_builder.h @@ -35,12 +35,15 @@ struct grpc_resource_quota; +namespace grpc_impl { + +class Server; +} // namespace grpc_impl namespace grpc { class AsyncGenericService; class ResourceQuota; class CompletionQueue; -class Server; class ServerCompletionQueue; class ServerCredentials; class Service; @@ -70,7 +73,7 @@ class ServerBuilder { /// traffic (via AddListeningPort) /// 3. [for async api only] completion queues have been added via /// AddCompletionQueue - virtual std::unique_ptr BuildAndStart(); + virtual std::unique_ptr BuildAndStart(); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the \a Server instance returned diff --git a/include/grpcpp/server_impl.h b/include/grpcpp/server_impl.h new file mode 100644 index 00000000000..d27723a01fe --- /dev/null +++ b/include/grpcpp/server_impl.h @@ -0,0 +1,353 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SERVER_IMPL_H +#define GRPCPP_SERVER_IMPL_H + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct grpc_server; + +namespace grpc { + +class AsyncGenericService; +class HealthCheckServiceInterface; +class ServerContext; +class ServerInitializer; + +} // namespace grpc + +namespace grpc_impl { + +/// Represents a gRPC server. +/// +/// Use a \a grpc::ServerBuilder to create, configure, and start +/// \a Server instances. +class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { + public: + ~Server(); + + /// Block until the server shuts down. + /// + /// \warning The server must be either shutting down or some other thread must + /// call \a Shutdown for this function to ever return. + void Wait() override; + + /// Global callbacks are a set of hooks that are called when server + /// events occur. \a SetGlobalCallbacks method is used to register + /// the hooks with gRPC. Note that + /// the \a GlobalCallbacks instance will be shared among all + /// \a Server instances in an application and can be set exactly + /// once per application. + class GlobalCallbacks { + public: + virtual ~GlobalCallbacks() {} + /// Called before server is created. + virtual void UpdateArguments(grpc::ChannelArguments* args) {} + /// Called before application callback for each synchronous server request + virtual void PreSynchronousRequest(grpc::ServerContext* context) = 0; + /// Called after application callback for each synchronous server request + virtual void PostSynchronousRequest(grpc::ServerContext* context) = 0; + /// Called before server is started. + virtual void PreServerStart(Server* server) {} + /// Called after a server port is added. + virtual void AddPort(Server* server, const grpc::string& addr, + grpc::ServerCredentials* creds, int port) {} + }; + /// Set the global callback object. Can only be called once per application. + /// Does not take ownership of callbacks, and expects the pointed to object + /// to be alive until all server objects in the process have been destroyed. + /// The same \a GlobalCallbacks object will be used throughout the + /// application and is shared among all \a Server objects. + static void SetGlobalCallbacks(GlobalCallbacks* callbacks); + + /// Returns a \em raw pointer to the underlying \a grpc_server instance. + /// EXPERIMENTAL: for internal/test use only + grpc_server* c_server(); + + /// Returns the health check service. + grpc::HealthCheckServiceInterface* GetHealthCheckService() const { + return health_check_service_.get(); + } + + /// Establish a channel for in-process communication + std::shared_ptr InProcessChannel(const grpc::ChannelArguments& args); + + /// NOTE: class experimental_type is not part of the public API of this class. + /// TODO(yashykt): Integrate into public API when this is no longer + /// experimental. + class experimental_type { + public: + explicit experimental_type(Server* server) : server_(server) {} + + /// Establish a channel for in-process communication with client + /// interceptors + std::shared_ptr InProcessChannelWithInterceptors( + const grpc::ChannelArguments& args, + std::vector< + std::unique_ptr> + interceptor_creators); + + private: + Server* server_; + }; + + /// NOTE: The function experimental() is not stable public API. It is a view + /// to the experimental components of this class. It may be changed or removed + /// at any time. + experimental_type experimental() { return experimental_type(this); } + + protected: + /// Register a service. This call does not take ownership of the service. + /// The service must exist for the lifetime of the Server instance. + bool RegisterService(const grpc::string* host, grpc::Service* service) override; + + /// Try binding the server to the given \a addr endpoint + /// (port, and optionally including IP address to bind to). + /// + /// It can be invoked multiple times. Should be used before + /// starting the server. + /// + /// \param addr The address to try to bind to the server (eg, localhost:1234, + /// 192.168.1.1:31416, [::1]:27182, etc.). + /// \param creds The credentials associated with the server. + /// + /// \return bound port number on success, 0 on failure. + /// + /// \warning It is an error to call this method on an already started server. + int AddListeningPort(const grpc::string& addr, + grpc::ServerCredentials* creds) override; + + /// NOTE: This is *NOT* a public API. The server constructors are supposed to + /// be used by \a ServerBuilder class only. The constructor will be made + /// 'private' very soon. + /// + /// Server constructors. To be used by \a ServerBuilder only. + /// + /// \param max_message_size Maximum message length that the channel can + /// receive. + /// + /// \param args The channel args + /// + /// \param sync_server_cqs The completion queues to use if the server is a + /// synchronous server (or a hybrid server). The server polls for new RPCs on + /// these queues + /// + /// \param min_pollers The minimum number of polling threads per server + /// completion queue (in param sync_server_cqs) to use for listening to + /// incoming requests (used only in case of sync server) + /// + /// \param max_pollers The maximum number of polling threads per server + /// completion queue (in param sync_server_cqs) to use for listening to + /// incoming requests (used only in case of sync server) + /// + /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on + /// server completion queues passed via sync_server_cqs param. + Server(int max_message_size, grpc::ChannelArguments* args, + std::shared_ptr>> + sync_server_cqs, + int min_pollers, int max_pollers, int sync_cq_timeout_msec, + grpc_resource_quota* server_rq = nullptr, + std::vector< + std::unique_ptr> + interceptor_creators = std::vector>()); + + /// Start the server. + /// + /// \param cqs Completion queues for handling asynchronous services. The + /// caller is required to keep all completion queues live until the server is + /// destroyed. + /// \param num_cqs How many completion queues does \a cqs hold. + void Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) override; + + grpc_server* server() override { return server_; } + + private: + std::vector>* + interceptor_creators() override { + return &interceptor_creators_; + } + + friend class grpc::AsyncGenericService; + friend class grpc::ServerBuilder; + friend class grpc::ServerInitializer; + + class SyncRequest; + class CallbackRequestBase; + template + class CallbackRequest; + class UnimplementedAsyncRequest; + class UnimplementedAsyncResponse; + + /// SyncRequestThreadManager is an implementation of ThreadManager. This class + /// is responsible for polling for incoming RPCs and calling the RPC handlers. + /// This is only used in case of a Sync server (i.e a server exposing a sync + /// interface) + class SyncRequestThreadManager; + + /// Register a generic service. This call does not take ownership of the + /// service. The service must exist for the lifetime of the Server instance. + void RegisterAsyncGenericService(grpc::AsyncGenericService* service) override; + + /// NOTE: class experimental_registration_type is not part of the public API + /// of this class + /// TODO(vjpai): Move these contents to the public API of Server when + /// they are no longer experimental + class experimental_registration_type final + : public experimental_registration_interface { + public: + explicit experimental_registration_type(Server* server) : server_(server) {} + void RegisterCallbackGenericService( + grpc::experimental::CallbackGenericService* service) override { + server_->RegisterCallbackGenericService(service); + } + + private: + Server* server_; + }; + + /// TODO(vjpai): Mark this override when experimental type above is deleted + void RegisterCallbackGenericService( + grpc::experimental::CallbackGenericService* service); + + /// NOTE: The function experimental_registration() is not stable public API. + /// It is a view to the experimental components of this class. It may be + /// changed or removed at any time. + experimental_registration_interface* experimental_registration() override { + return &experimental_registration_; + } + + void PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, + grpc::internal::Call* call) override; + + void ShutdownInternal(gpr_timespec deadline) override; + + int max_receive_message_size() const override { + return max_receive_message_size_; + } + + grpc::CompletionQueue* CallbackCQ() override; + + grpc::ServerInitializer* initializer(); + + // A vector of interceptor factory objects. + // This should be destroyed after health_check_service_ and this requirement + // is satisfied by declaring interceptor_creators_ before + // health_check_service_. (C++ mandates that member objects be destroyed in + // the reverse order of initialization.) + std::vector> + interceptor_creators_; + + const int max_receive_message_size_; + + /// The following completion queues are ONLY used in case of Sync API + /// i.e. if the server has any services with sync methods. The server uses + /// these completion queues to poll for new RPCs + std::shared_ptr>> + sync_server_cqs_; + + /// List of \a ThreadManager instances (one for each cq in + /// the \a sync_server_cqs) + std::vector> sync_req_mgrs_; + + // Outstanding unmatched callback requests, indexed by method. + // NOTE: Using a gpr_atm rather than atomic_int because atomic_int isn't + // copyable or movable and thus will cause compilation errors. We + // actually only want to extend the vector before the threaded use + // starts, but this is still a limitation. + std::vector callback_unmatched_reqs_count_; + + // List of callback requests to start when server actually starts. + std::list callback_reqs_to_start_; + + // For registering experimental callback generic service; remove when that + // method longer experimental + experimental_registration_type experimental_registration_{this}; + + // Server status + std::mutex mu_; + bool started_; + bool shutdown_; + bool shutdown_notified_; // Was notify called on the shutdown_cv_ + + std::condition_variable shutdown_cv_; + + // It is ok (but not required) to nest callback_reqs_mu_ under mu_ . + // Incrementing callback_reqs_outstanding_ is ok without a lock but it must be + // decremented under the lock in case it is the last request and enables the + // server shutdown. The increment is performance-critical since it happens + // during periods of increasing load; the decrement happens only when memory + // is maxed out, during server shutdown, or (possibly in a future version) + // during decreasing load, so it is less performance-critical. + std::mutex callback_reqs_mu_; + std::condition_variable callback_reqs_done_cv_; + std::atomic_int callback_reqs_outstanding_{0}; + + std::shared_ptr global_callbacks_; + + std::vector services_; + bool has_async_generic_service_{false}; + bool has_callback_generic_service_{false}; + + // Pointer to the wrapped grpc_server. + grpc_server* server_; + + std::unique_ptr server_initializer_; + + std::unique_ptr health_check_service_; + bool health_check_service_disabled_; + + // When appropriate, use a default callback generic service to handle + // unimplemented methods + std::unique_ptr unimplemented_service_; + + // A special handler for resource exhausted in sync case + std::unique_ptr resource_exhausted_handler_; + + // Handler for callback generic service, if any + std::unique_ptr generic_handler_; + + // callback_cq_ references the callbackable completion queue associated + // with this server (if any). It is set on the first call to CallbackCQ(). + // It is _not owned_ by the server; ownership belongs with its internal + // shutdown callback tag (invoked when the CQ is fully shutdown). + // It is protected by mu_ + grpc::CompletionQueue* callback_cq_ = nullptr; +}; + +} // namespace grpc_impl + +#endif // GRPCPP_SERVER_IMPL_H diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 26e84f1aed4..74d89ff18fc 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -106,15 +106,183 @@ class UnimplementedAsyncRequestContext { } // namespace +ServerInterface::BaseAsyncRequest::BaseAsyncRequest( + ServerInterface* server, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : server_(server), + context_(context), + stream_(stream), + call_cq_(call_cq), + notification_cq_(notification_cq), + tag_(tag), + delete_on_finalize_(delete_on_finalize), + call_(nullptr), + done_intercepting_(false) { + /* Set up interception state partially for the receive ops. call_wrapper_ is + * not filled at this point, but it will be filled before the interceptors are + * run. */ + interceptor_methods_.SetCall(&call_wrapper_); + interceptor_methods_.SetReverse(); + call_cq_->RegisterAvalanching(); // This op will trigger more ops +} + +ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { + call_cq_->CompleteAvalanching(); +} + +bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (done_intercepting_) { + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; + } + context_->set_call(call_); + context_->cq_ = call_cq_; + if (call_wrapper_.call() == nullptr) { + // Fill it since it is empty. + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); + } + + // just the pointers inside call are copied here + stream_->BindCall(&call_wrapper_); + + if (*status && call_ && call_wrapper_.server_rpc_info()) { + done_intercepting_ = true; + // Set interception point for RECV INITIAL METADATA + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); + if (interceptor_methods_.RunInterceptors( + [this]() { ContinueFinalizeResultAfterInterception(); })) { + // There are no interceptors to run. Continue + } else { + // There were interceptors to be run, so + // ContinueFinalizeResultAfterInterception will be run when interceptors + // are done. + return false; + } + } + if (*status && call_) { + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); + } + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; +} + +void ServerInterface::BaseAsyncRequest:: + ContinueFinalizeResultAfterInterception() { + context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); + // Queue a tag which will be returned immediately + grpc_core::ExecCtx exec_ctx; + grpc_cq_begin_op(notification_cq_->cq(), this); + grpc_cq_end_op( + notification_cq_->cq(), this, GRPC_ERROR_NONE, + [](void* arg, grpc_cq_completion* completion) { delete completion; }, + nullptr, new grpc_cq_completion()); +} + +ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( + ServerInterface* server, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, const char* name, + internal::RpcMethod::RpcType type) + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, + true), + name_(name), + type_(type) {} + +void ServerInterface::RegisteredAsyncRequest::IssueRequest( + void* registered_method, grpc_byte_buffer** payload, + ServerCompletionQueue* notification_cq) { + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call( + server_->server(), registered_method, &call_, + &context_->deadline_, + context_->client_metadata_.arr(), payload, + call_cq_->cq(), notification_cq->cq(), this)); +} + +ServerInterface::GenericAsyncRequest::GenericAsyncRequest( + ServerInterface* server, GenericServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, + delete_on_finalize) { + grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + server->server(), &call_, &call_details_, + context->client_metadata_.arr(), call_cq->cq(), + notification_cq->cq(), this)); +} + +bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, + bool* status) { + // If we are done intercepting, there is nothing more for us to do + if (done_intercepting_) { + return BaseAsyncRequest::FinalizeResult(tag, status); + } + // TODO(yangg) remove the copy here. + if (*status) { + static_cast(context_)->method_ = + StringFromCopiedSlice(call_details_.method); + static_cast(context_)->host_ = + StringFromCopiedSlice(call_details_.host); + context_->deadline_ = call_details_.deadline; + } + grpc_slice_unref(call_details_.method); + grpc_slice_unref(call_details_.host); + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), + context_->set_server_rpc_info( + static_cast(context_)->method_.c_str(), + internal::RpcMethod::BIDI_STREAMING, + *server_->interceptor_creators())); + return BaseAsyncRequest::FinalizeResult(tag, status); +} + +namespace { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { + public: + ShutdownCallback() { functor_run = &ShutdownCallback::Run; } + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it + void TakeCQ(CompletionQueue* cq) { cq_ = cq; } + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast(cb); + delete callback->cq_; + delete callback; + } + + private: + CompletionQueue* cq_ = nullptr; +}; +} // namespace + +} // namespace grpc + +namespace grpc_impl { + /// Use private inheritance rather than composition only to establish order /// of construction, since the public base class should be constructed after the /// elements belonging to the private base class are constructed. This is not /// possible using true composition. class Server::UnimplementedAsyncRequest final - : private UnimplementedAsyncRequestContext, + : private grpc::UnimplementedAsyncRequestContext, public GenericAsyncRequest { public: - UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) + UnimplementedAsyncRequest(Server* server, grpc::ServerCompletionQueue* cq) : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, nullptr, false), server_(server), @@ -122,27 +290,27 @@ class Server::UnimplementedAsyncRequest final bool FinalizeResult(void** tag, bool* status) override; - ServerContext* context() { return &server_context_; } - GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } + grpc::ServerContext* context() { return &server_context_; } + grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } private: Server* const server_; - ServerCompletionQueue* const cq_; + grpc::ServerCompletionQueue* const cq_; }; /// UnimplementedAsyncResponse should not post user-visible completions to the /// C++ completion queue, but is generated as a CQ event by the core class Server::UnimplementedAsyncResponse final - : public internal::CallOpSet { + : public grpc::internal::CallOpSet { public: UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); ~UnimplementedAsyncResponse() { delete request_; } bool FinalizeResult(void** tag, bool* status) override { - if (internal::CallOpSet< - internal::CallOpSendInitialMetadata, - internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) { + if (grpc::internal::CallOpSet< + grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) { delete this; } else { // The tag was swallowed due to interception. We will see it again. @@ -154,15 +322,15 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class Server::SyncRequest final : public internal::CompletionQueueTag { +class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { public: - SyncRequest(internal::RpcServiceMethod* method, void* method_tag) + SyncRequest(grpc::internal::RpcServiceMethod* method, void* method_tag) : method_(method), method_tag_(method_tag), in_flight_(false), has_request_payload_( - method->method_type() == internal::RpcMethod::NORMAL_RPC || - method->method_type() == internal::RpcMethod::SERVER_STREAMING), + method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || + method->method_type() == grpc::internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -273,7 +441,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { interceptor_methods_.SetReverse(); // Set interception point for RECV INITIAL METADATA interceptor_methods_.AddInterceptionHookPoint( - experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); if (has_request_payload_) { @@ -285,7 +453,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { request_payload_ = nullptr; interceptor_methods_.AddInterceptionHookPoint( - experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); interceptor_methods_.SetRecvMessage(request_, nullptr); } @@ -304,40 +472,40 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { global_callbacks_->PreSynchronousRequest(&ctx_); auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); - handler->RunHandler(internal::MethodHandler::HandlerParameter( + handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( &call_, &ctx_, request_, request_status_, nullptr)); request_ = nullptr; global_callbacks_->PostSynchronousRequest(&ctx_); cq_.Shutdown(); - internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + grpc::internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ - DummyTag ignored_tag; + grpc::DummyTag ignored_tag; GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); } delete this; } private: - CompletionQueue cq_; - ServerContext ctx_; + grpc::CompletionQueue cq_; + grpc::ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; void* request_; - Status request_status_; - internal::RpcServiceMethod* const method_; - internal::Call call_; + grpc::Status request_status_; + grpc::internal::RpcServiceMethod* const method_; + grpc::internal::Call call_; Server* server_; std::shared_ptr global_callbacks_; bool resources_; - internal::InterceptorBatchMethodsImpl interceptor_methods_; + grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; private: - internal::RpcServiceMethod* const method_; + grpc::internal::RpcServiceMethod* const method_; void* const method_tag_; bool in_flight_; const bool has_request_payload_; @@ -349,7 +517,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { grpc_completion_queue* cq_; }; -class Server::CallbackRequestBase : public internal::CompletionQueueTag { +class Server::CallbackRequestBase : public grpc::internal::CompletionQueueTag { public: virtual ~CallbackRequestBase() {} virtual bool Request() = 0; @@ -358,7 +526,7 @@ class Server::CallbackRequestBase : public internal::CompletionQueueTag { template class Server::CallbackRequest final : public Server::CallbackRequestBase { public: - static_assert(std::is_base_of::value, + static_assert(std::is_base_of::value, "ServerContextType must be derived from ServerContext"); // The constructor needs to know the server for this callback request and its @@ -368,15 +536,15 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { // requested. For generic services, method and method_tag are nullptr since // these services don't have pre-defined methods or method registration tags. CallbackRequest(Server* server, size_t method_idx, - internal::RpcServiceMethod* method, void* method_tag) + grpc::internal::RpcServiceMethod* method, void* method_tag) : server_(server), method_index_(method_idx), method_(method), method_tag_(method_tag), has_request_payload_( method_ != nullptr && - (method->method_type() == internal::RpcMethod::NORMAL_RPC || - method->method_type() == internal::RpcMethod::SERVER_STREAMING)), + (method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || + method->method_type() == grpc::internal::RpcMethod::SERVER_STREAMING)), cq_(server->CallbackCQ()), tag_(this) { server_->callback_reqs_outstanding_++; @@ -440,7 +608,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { private: Server::CallbackRequest* req_; - internal::Call* call_; + grpc::internal::Call* call_; static void StaticRun(grpc_experimental_completion_queue_functor* cb, int ok) { @@ -491,21 +659,21 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { req_->request_metadata_.count = 0; // Create a C++ Call to control the underlying core call - call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(internal::Call))) - internal::Call(req_->call_, req_->server_, req_->cq_, + call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) + grpc::internal::Call(req_->call_, req_->server_, req_->cq_, req_->server_->max_receive_message_size(), req_->ctx_.set_server_rpc_info( req_->method_name(), (req_->method_ != nullptr) ? req_->method_->method_type() - : internal::RpcMethod::BIDI_STREAMING, + : grpc::internal::RpcMethod::BIDI_STREAMING, req_->server_->interceptor_creators_)); req_->interceptor_methods_.SetCall(call_); req_->interceptor_methods_.SetReverse(); // Set interception point for RECV INITIAL METADATA req_->interceptor_methods_.AddInterceptionHookPoint( - experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); req_->interceptor_methods_.SetRecvInitialMetadata( &req_->ctx_.client_metadata_); @@ -515,7 +683,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { req_->call_, req_->request_payload_, &req_->request_status_); req_->request_payload_ = nullptr; req_->interceptor_methods_.AddInterceptionHookPoint( - experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE); req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr); } @@ -531,7 +699,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { auto* handler = (req_->method_ != nullptr) ? req_->method_->handler() : req_->server_->generic_handler_.get(); - handler->RunHandler(internal::MethodHandler::HandlerParameter( + handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, [this] { // Recycle this request if there aren't too many outstanding. // Note that we don't have to worry about a case where there @@ -577,40 +745,40 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); request_payload_ = nullptr; request_ = nullptr; - request_status_ = Status(); + request_status_ = grpc::Status(); } Server* const server_; const size_t method_index_; - internal::RpcServiceMethod* const method_; + grpc::internal::RpcServiceMethod* const method_; void* const method_tag_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; void* request_; - Status request_status_; + grpc::Status request_status_; grpc_call_details* call_details_ = nullptr; grpc_call* call_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; - CompletionQueue* cq_; + grpc::CompletionQueue* cq_; CallbackCallTag tag_; ServerContextType ctx_; - internal::InterceptorBatchMethodsImpl interceptor_methods_; + grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_; }; template <> -bool Server::CallbackRequest::FinalizeResult(void** tag, +bool Server::CallbackRequest::FinalizeResult(void** tag, bool* status) { return false; } template <> -bool Server::CallbackRequest::FinalizeResult( +bool Server::CallbackRequest::FinalizeResult( void** tag, bool* status) { if (*status) { // TODO(yangg) remove the copy here - ctx_.method_ = StringFromCopiedSlice(call_details_->method); - ctx_.host_ = StringFromCopiedSlice(call_details_->host); + ctx_.method_ = grpc::StringFromCopiedSlice(call_details_->method); + ctx_.host_ = grpc::StringFromCopiedSlice(call_details_->host); } grpc_slice_unref(call_details_->method); grpc_slice_unref(call_details_->host); @@ -618,21 +786,21 @@ bool Server::CallbackRequest::FinalizeResult( } template <> -const char* Server::CallbackRequest::method_name() const { +const char* Server::CallbackRequest::method_name() const { return method_->name(); } template <> -const char* Server::CallbackRequest::method_name() const { +const char* Server::CallbackRequest::method_name() const { return ctx_.method().c_str(); } // Implementation of ThreadManager. Each instance of SyncRequestThreadManager // manages a pool of threads that poll for incoming Sync RPCs and call the // appropriate RPC handlers -class Server::SyncRequestThreadManager : public ThreadManager { +class Server::SyncRequestThreadManager : public grpc::ThreadManager { public: - SyncRequestThreadManager(Server* server, CompletionQueue* server_cq, + SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq, std::shared_ptr global_callbacks, grpc_resource_quota* rq, int min_pollers, int max_pollers, int cq_timeout_msec) @@ -651,11 +819,11 @@ class Server::SyncRequestThreadManager : public ThreadManager { gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN)); switch (server_cq_->AsyncNext(tag, ok, deadline)) { - case CompletionQueue::TIMEOUT: + case grpc::CompletionQueue::TIMEOUT: return TIMEOUT; - case CompletionQueue::SHUTDOWN: + case grpc::CompletionQueue::SHUTDOWN: return SHUTDOWN; - case CompletionQueue::GOT_EVENT: + case grpc::CompletionQueue::GOT_EVENT: return WORK_FOUND; } @@ -690,15 +858,15 @@ class Server::SyncRequestThreadManager : public ThreadManager { // object } - void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) { + void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new internal::RpcServiceMethod( - "unknown", internal::RpcMethod::BIDI_STREAMING, - new internal::UnknownMethodHandler)); + unknown_method_.reset(new grpc::internal::RpcServiceMethod( + "unknown", grpc::internal::RpcMethod::BIDI_STREAMING, + new grpc::internal::UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -742,22 +910,22 @@ class Server::SyncRequestThreadManager : public ThreadManager { private: Server* server_; - CompletionQueue* server_cq_; + grpc::CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector> sync_requests_; - std::unique_ptr unknown_method_; + std::unique_ptr unknown_method_; std::shared_ptr global_callbacks_; }; -static internal::GrpcLibraryInitializer g_gli_initializer; +static grpc::internal::GrpcLibraryInitializer g_gli_initializer; Server::Server( - int max_receive_message_size, ChannelArguments* args, - std::shared_ptr>> + int max_receive_message_size, grpc::ChannelArguments* args, + std::shared_ptr>> sync_server_cqs, int min_pollers, int max_pollers, int sync_cq_timeout_msec, grpc_resource_quota* server_rq, std::vector< - std::unique_ptr> + std::unique_ptr> interceptor_creators) : interceptor_creators_(std::move(interceptor_creators)), max_receive_message_size_(max_receive_message_size), @@ -766,11 +934,11 @@ Server::Server( shutdown_(false), shutdown_notified_(false), server_(nullptr), - server_initializer_(new ServerInitializer(this)), + server_initializer_(new grpc::ServerInitializer(this)), health_check_service_disabled_(false) { g_gli_initializer.summon(); - gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); - global_callbacks_ = g_callbacks; + gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks); + global_callbacks_ = grpc::g_callbacks; global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { @@ -798,11 +966,11 @@ Server::Server( for (size_t i = 0; i < channel_args.num_args; i++) { if (0 == - strcmp(channel_args.args[i].key, kHealthCheckServiceInterfaceArg)) { + strcmp(channel_args.args[i].key, grpc::kHealthCheckServiceInterfaceArg)) { if (channel_args.args[i].value.pointer.p == nullptr) { health_check_service_disabled_ = true; } else { - health_check_service_.reset(static_cast( + health_check_service_.reset(static_cast( channel_args.args[i].value.pointer.p)); } break; @@ -840,49 +1008,49 @@ Server::~Server() { } void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { - GPR_ASSERT(!g_callbacks); + GPR_ASSERT(!grpc::g_callbacks); GPR_ASSERT(callbacks); - g_callbacks.reset(callbacks); + grpc::g_callbacks.reset(callbacks); } grpc_server* Server::c_server() { return server_; } -std::shared_ptr Server::InProcessChannel( - const ChannelArguments& args) { +std::shared_ptr Server::InProcessChannel( + const grpc::ChannelArguments& args) { grpc_channel_args channel_args = args.c_channel_args(); - return CreateChannelInternal( + return grpc::CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), std::vector< - std::unique_ptr>()); + std::unique_ptr>()); } -std::shared_ptr +std::shared_ptr Server::experimental_type::InProcessChannelWithInterceptors( - const ChannelArguments& args, + const grpc::ChannelArguments& args, std::vector< - std::unique_ptr> + std::unique_ptr> interceptor_creators) { grpc_channel_args channel_args = args.c_channel_args(); - return CreateChannelInternal( + return grpc::CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_->server_, &channel_args, nullptr), std::move(interceptor_creators)); } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - internal::RpcServiceMethod* method) { + grpc::internal::RpcServiceMethod* method) { switch (method->method_type()) { - case internal::RpcMethod::NORMAL_RPC: - case internal::RpcMethod::SERVER_STREAMING: + case grpc::internal::RpcMethod::NORMAL_RPC: + case grpc::internal::RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case internal::RpcMethod::CLIENT_STREAMING: - case internal::RpcMethod::BIDI_STREAMING: + case grpc::internal::RpcMethod::CLIENT_STREAMING: + case grpc::internal::RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); } -bool Server::RegisterService(const grpc::string* host, Service* service) { +bool Server::RegisterService(const grpc::string* host, grpc::Service* service) { bool has_async_methods = service->has_async_methods(); if (has_async_methods) { GPR_ASSERT(service->server_ == nullptr && @@ -898,7 +1066,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { continue; } - internal::RpcServiceMethod* method = it->get(); + grpc::internal::RpcServiceMethod* method = it->get(); void* method_registration_tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); @@ -911,7 +1079,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { if (method->handler() == nullptr) { // Async method without handler method->set_server_tag(method_registration_tag); } else if (method->api_type() == - internal::RpcServiceMethod::ApiType::SYNC) { + grpc::internal::RpcServiceMethod::ApiType::SYNC) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->AddSyncMethod(method, method_registration_tag); } @@ -921,7 +1089,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { auto method_index = callback_unmatched_reqs_count_.size() - 1; // TODO(vjpai): Register these dynamically based on need for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { - callback_reqs_to_start_.push_back(new CallbackRequest( + callback_reqs_to_start_.push_back(new CallbackRequest( this, method_index, method, method_registration_tag)); } // Enqueue it so that it will be Request'ed later after all request @@ -943,7 +1111,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { return true; } -void Server::RegisterAsyncGenericService(AsyncGenericService* service) { +void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an async generic service against one server."); service->server_ = this; @@ -951,7 +1119,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) { } void Server::RegisterCallbackGenericService( - experimental::CallbackGenericService* service) { + grpc::experimental::CallbackGenericService* service) { GPR_ASSERT( service->server_ == nullptr && "Can only register a callback generic service against one server."); @@ -963,44 +1131,44 @@ void Server::RegisterCallbackGenericService( auto method_index = callback_unmatched_reqs_count_.size() - 1; // TODO(vjpai): Register these dynamically based on need for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { - callback_reqs_to_start_.push_back(new CallbackRequest( + callback_reqs_to_start_.push_back(new CallbackRequest( this, method_index, nullptr, nullptr)); } } int Server::AddListeningPort(const grpc::string& addr, - ServerCredentials* creds) { + grpc::ServerCredentials* creds) { GPR_ASSERT(!started_); int port = creds->AddPortToServer(addr, server_); global_callbacks_->AddPort(this, addr, creds, port); return port; } -void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { +void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); global_callbacks_->PreServerStart(this); started_ = true; // Only create default health check service when user did not provide an // explicit one. - ServerCompletionQueue* health_check_cq = nullptr; - DefaultHealthCheckService::HealthCheckServiceImpl* + grpc::ServerCompletionQueue* health_check_cq = nullptr; + grpc::DefaultHealthCheckService::HealthCheckServiceImpl* default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && - DefaultHealthCheckServiceEnabled()) { - auto* default_hc_service = new DefaultHealthCheckService; + grpc::DefaultHealthCheckServiceEnabled()) { + auto* default_hc_service = new grpc::DefaultHealthCheckService; health_check_service_.reset(default_hc_service); // We create a non-polling CQ to avoid impacting application // performance. This ensures that we don't introduce thread hops // for application requests that wind up on this CQ, which is polled // in its own thread. health_check_cq = - new ServerCompletionQueue(GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); + new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); grpc_server_register_completion_queue(server_, health_check_cq->cq(), nullptr); default_health_check_service_impl = default_hc_service->GetHealthCheckService( - std::unique_ptr(health_check_cq)); + std::unique_ptr(health_check_cq)); RegisterService(nullptr, default_health_check_service_impl); } @@ -1008,7 +1176,7 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // service to handle any unimplemented methods using the default reactor // creator if (!callback_reqs_to_start_.empty() && !has_callback_generic_service_) { - unimplemented_service_.reset(new experimental::CallbackGenericService); + unimplemented_service_.reset(new grpc::experimental::CallbackGenericService); RegisterCallbackGenericService(unimplemented_service_.get()); } @@ -1033,7 +1201,7 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // server CQs), make sure that we have a ResourceExhausted handler // to deal with the case of thread exhaustion if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { - resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler); + resource_exhausted_handler_.reset(new grpc::internal::ResourceExhaustedHandler); } for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { @@ -1059,20 +1227,20 @@ void Server::ShutdownInternal(gpr_timespec deadline) { shutdown_ = true; /// The completion queue to use for server shutdown completion notification - CompletionQueue shutdown_cq; - ShutdownTag shutdown_tag; // Dummy shutdown tag + grpc::CompletionQueue shutdown_cq; + grpc::ShutdownTag shutdown_tag; // Dummy shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); shutdown_cq.Shutdown(); void* tag; bool ok; - CompletionQueue::NextStatus status = + grpc::CompletionQueue::NextStatus status = shutdown_cq.AsyncNext(&tag, &ok, deadline); // If this timed out, it means we are done with the grace period for a clean // shutdown. We should force a shutdown now by cancelling all inflight calls - if (status == CompletionQueue::NextStatus::TIMEOUT) { + if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) { grpc_server_cancel_all_calls(server_); } // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has @@ -1124,154 +1292,11 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, - internal::Call* call) { +void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, + grpc::internal::Call* call) { ops->FillOps(call); } -ServerInterface::BaseAsyncRequest::BaseAsyncRequest( - ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) - : server_(server), - context_(context), - stream_(stream), - call_cq_(call_cq), - notification_cq_(notification_cq), - tag_(tag), - delete_on_finalize_(delete_on_finalize), - call_(nullptr), - done_intercepting_(false) { - /* Set up interception state partially for the receive ops. call_wrapper_ is - * not filled at this point, but it will be filled before the interceptors are - * run. */ - interceptor_methods_.SetCall(&call_wrapper_); - interceptor_methods_.SetReverse(); - call_cq_->RegisterAvalanching(); // This op will trigger more ops -} - -ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { - call_cq_->CompleteAvalanching(); -} - -bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, - bool* status) { - if (done_intercepting_) { - *tag = tag_; - if (delete_on_finalize_) { - delete this; - } - return true; - } - context_->set_call(call_); - context_->cq_ = call_cq_; - if (call_wrapper_.call() == nullptr) { - // Fill it since it is empty. - call_wrapper_ = internal::Call( - call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); - } - - // just the pointers inside call are copied here - stream_->BindCall(&call_wrapper_); - - if (*status && call_ && call_wrapper_.server_rpc_info()) { - done_intercepting_ = true; - // Set interception point for RECV INITIAL METADATA - interceptor_methods_.AddInterceptionHookPoint( - experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); - interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); - if (interceptor_methods_.RunInterceptors( - [this]() { ContinueFinalizeResultAfterInterception(); })) { - // There are no interceptors to run. Continue - } else { - // There were interceptors to be run, so - // ContinueFinalizeResultAfterInterception will be run when interceptors - // are done. - return false; - } - } - if (*status && call_) { - context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); - } - *tag = tag_; - if (delete_on_finalize_) { - delete this; - } - return true; -} - -void ServerInterface::BaseAsyncRequest:: - ContinueFinalizeResultAfterInterception() { - context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr); - // Queue a tag which will be returned immediately - grpc_core::ExecCtx exec_ctx; - grpc_cq_begin_op(notification_cq_->cq(), this); - grpc_cq_end_op( - notification_cq_->cq(), this, GRPC_ERROR_NONE, - [](void* arg, grpc_cq_completion* completion) { delete completion; }, - nullptr, new grpc_cq_completion()); -} - -ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( - ServerInterface* server, ServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, const char* name, - internal::RpcMethod::RpcType type) - : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, - true), - name_(name), - type_(type) {} - -void ServerInterface::RegisteredAsyncRequest::IssueRequest( - void* registered_method, grpc_byte_buffer** payload, - ServerCompletionQueue* notification_cq) { - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call( - server_->server(), registered_method, &call_, - &context_->deadline_, - context_->client_metadata_.arr(), payload, - call_cq_->cq(), notification_cq->cq(), this)); -} - -ServerInterface::GenericAsyncRequest::GenericAsyncRequest( - ServerInterface* server, GenericServerContext* context, - internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) - : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, - delete_on_finalize) { - grpc_call_details_init(&call_details_); - GPR_ASSERT(notification_cq); - GPR_ASSERT(call_cq); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( - server->server(), &call_, &call_details_, - context->client_metadata_.arr(), call_cq->cq(), - notification_cq->cq(), this)); -} - -bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, - bool* status) { - // If we are done intercepting, there is nothing more for us to do - if (done_intercepting_) { - return BaseAsyncRequest::FinalizeResult(tag, status); - } - // TODO(yangg) remove the copy here. - if (*status) { - static_cast(context_)->method_ = - StringFromCopiedSlice(call_details_.method); - static_cast(context_)->host_ = - StringFromCopiedSlice(call_details_.host); - context_->deadline_ = call_details_.deadline; - } - grpc_slice_unref(call_details_.method); - grpc_slice_unref(call_details_.host); - call_wrapper_ = internal::Call( - call_, server_, call_cq_, server_->max_receive_message_size(), - context_->set_server_rpc_info( - static_cast(context_)->method_.c_str(), - internal::RpcMethod::BIDI_STREAMING, - *server_->interceptor_creators())); - return BaseAsyncRequest::FinalizeResult(tag, status); -} - bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, bool* status) { if (GenericAsyncRequest::FinalizeResult(tag, status)) { @@ -1291,41 +1316,20 @@ bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { - Status status(StatusCode::UNIMPLEMENTED, ""); - internal::UnknownMethodHandler::FillOps(request_->context(), this); + grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, ""); + grpc::internal::UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } -ServerInitializer* Server::initializer() { return server_initializer_.get(); } - -namespace { -class ShutdownCallback : public grpc_experimental_completion_queue_functor { - public: - ShutdownCallback() { functor_run = &ShutdownCallback::Run; } - // TakeCQ takes ownership of the cq into the shutdown callback - // so that the shutdown callback will be responsible for destroying it - void TakeCQ(CompletionQueue* cq) { cq_ = cq; } - - // The Run function will get invoked by the completion queue library - // when the shutdown is actually complete - static void Run(grpc_experimental_completion_queue_functor* cb, int) { - auto* callback = static_cast(cb); - delete callback->cq_; - delete callback; - } - - private: - CompletionQueue* cq_ = nullptr; -}; -} // namespace +grpc::ServerInitializer* Server::initializer() { return server_initializer_.get(); } -CompletionQueue* Server::CallbackCQ() { +grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered std::lock_guard l(mu_); if (callback_cq_ == nullptr) { - auto* shutdown_callback = new ShutdownCallback; - callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ + auto* shutdown_callback = new grpc::ShutdownCallback; + callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); @@ -1335,4 +1339,4 @@ CompletionQueue* Server::CallbackCQ() { return callback_cq_; } -} // namespace grpc +} // namespace grpc_impl diff --git a/test/cpp/util/metrics_server.h b/test/cpp/util/metrics_server.h index 2d6ddf08043..08f6034c28e 100644 --- a/test/cpp/util/metrics_server.h +++ b/test/cpp/util/metrics_server.h @@ -21,6 +21,8 @@ #include #include +#include + #include "src/proto/grpc/testing/metrics.grpc.pb.h" #include "src/proto/grpc/testing/metrics.pb.h" From dc3a715be46f481ba8cd0205496d5bd3db8008d5 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Wed, 27 Mar 2019 14:33:58 -0700 Subject: [PATCH 2/5] Fix clang format code errors. --- .../grpcpp/impl/codegen/completion_queue.h | 2 +- include/grpcpp/impl/codegen/server_context.h | 2 +- include/grpcpp/impl/codegen/service_type.h | 2 +- include/grpcpp/impl/server_initializer.h | 2 +- include/grpcpp/security/server_credentials.h | 2 +- include/grpcpp/server_builder.h | 2 +- include/grpcpp/server_impl.h | 40 +++++---- src/cpp/server/server_cc.cc | 90 +++++++++++-------- 8 files changed, 82 insertions(+), 60 deletions(-) diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 559b7b7fe47..687bccc5ee9 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -44,7 +44,7 @@ struct grpc_completion_queue; namespace grpc_impl { class Server; -} // namespace grpc_impl +} // namespace grpc_impl namespace grpc { template diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index c2959d95de7..f65598db41f 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -44,7 +44,7 @@ struct census_context; namespace grpc_impl { class Server; -} // namespace grpc_impl +} // namespace grpc_impl namespace grpc { class ClientContext; class GenericServerContext; diff --git a/include/grpcpp/impl/codegen/service_type.h b/include/grpcpp/impl/codegen/service_type.h index 7ac482c19d7..1d94abe12dd 100644 --- a/include/grpcpp/impl/codegen/service_type.h +++ b/include/grpcpp/impl/codegen/service_type.h @@ -29,7 +29,7 @@ namespace grpc_impl { class Server; -} // namespace grpc_impl +} // namespace grpc_impl namespace grpc { class CompletionQueue; diff --git a/include/grpcpp/impl/server_initializer.h b/include/grpcpp/impl/server_initializer.h index bd18ee04927..85649d53df8 100644 --- a/include/grpcpp/impl/server_initializer.h +++ b/include/grpcpp/impl/server_initializer.h @@ -27,7 +27,7 @@ namespace grpc_impl { class Server; -} // namespace grpc_impl +} // namespace grpc_impl namespace grpc { class Service; diff --git a/include/grpcpp/security/server_credentials.h b/include/grpcpp/security/server_credentials.h index a519caf9913..25b25c9639b 100644 --- a/include/grpcpp/security/server_credentials.h +++ b/include/grpcpp/security/server_credentials.h @@ -31,7 +31,7 @@ struct grpc_server; namespace grpc_impl { class Server; -} // namespace grpc_impl +} // namespace grpc_impl namespace grpc { /// Wrapper around \a grpc_server_credentials, a way to authenticate a server. diff --git a/include/grpcpp/server_builder.h b/include/grpcpp/server_builder.h index 3b2e4d212fb..5761d19c55b 100644 --- a/include/grpcpp/server_builder.h +++ b/include/grpcpp/server_builder.h @@ -38,7 +38,7 @@ struct grpc_resource_quota; namespace grpc_impl { class Server; -} // namespace grpc_impl +} // namespace grpc_impl namespace grpc { class AsyncGenericService; diff --git a/include/grpcpp/server_impl.h b/include/grpcpp/server_impl.h index d27723a01fe..881abd33aa5 100644 --- a/include/grpcpp/server_impl.h +++ b/include/grpcpp/server_impl.h @@ -47,7 +47,7 @@ class HealthCheckServiceInterface; class ServerContext; class ServerInitializer; -} // namespace grpc +} // namespace grpc namespace grpc_impl { @@ -103,7 +103,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { } /// Establish a channel for in-process communication - std::shared_ptr InProcessChannel(const grpc::ChannelArguments& args); + std::shared_ptr InProcessChannel( + const grpc::ChannelArguments& args); /// NOTE: class experimental_type is not part of the public API of this class. /// TODO(yashykt): Integrate into public API when this is no longer @@ -116,8 +117,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { /// interceptors std::shared_ptr InProcessChannelWithInterceptors( const grpc::ChannelArguments& args, - std::vector< - std::unique_ptr> + std::vector> interceptor_creators); private: @@ -132,7 +133,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { protected: /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. - bool RegisterService(const grpc::string* host, grpc::Service* service) override; + bool RegisterService(const grpc::string* host, + grpc::Service* service) override; /// Try binding the server to the given \a addr endpoint /// (port, and optionally including IP address to bind to). @@ -175,15 +177,16 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { /// /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on /// server completion queues passed via sync_server_cqs param. - Server(int max_message_size, grpc::ChannelArguments* args, - std::shared_ptr>> - sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec, - grpc_resource_quota* server_rq = nullptr, - std::vector< - std::unique_ptr> - interceptor_creators = std::vector>()); + Server( + int max_message_size, grpc::ChannelArguments* args, + std::shared_ptr>> + sync_server_cqs, + int min_pollers, int max_pollers, int sync_cq_timeout_msec, + grpc_resource_quota* server_rq = nullptr, + std::vector> + interceptor_creators = std::vector>()); /// Start the server. /// @@ -196,7 +199,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { grpc_server* server() override { return server_; } private: - std::vector>* + std::vector< + std::unique_ptr>* interceptor_creators() override { return &interceptor_creators_; } @@ -268,7 +272,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { // is satisfied by declaring interceptor_creators_ before // health_check_service_. (C++ mandates that member objects be destroyed in // the reverse order of initialization.) - std::vector> + std::vector< + std::unique_ptr> interceptor_creators_; const int max_receive_message_size_; @@ -332,7 +337,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { // When appropriate, use a default callback generic service to handle // unimplemented methods - std::unique_ptr unimplemented_service_; + std::unique_ptr + unimplemented_service_; // A special handler for resource exhausted in sync case std::unique_ptr resource_exhausted_handler_; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 74d89ff18fc..90f203f3820 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -270,7 +270,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { }; } // namespace -} // namespace grpc +} // namespace grpc namespace grpc_impl { @@ -301,8 +301,9 @@ class Server::UnimplementedAsyncRequest final /// UnimplementedAsyncResponse should not post user-visible completions to the /// C++ completion queue, but is generated as a CQ event by the core class Server::UnimplementedAsyncResponse final - : public grpc::internal::CallOpSet { + : public grpc::internal::CallOpSet< + grpc::internal::CallOpSendInitialMetadata, + grpc::internal::CallOpServerSendStatus> { public: UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); ~UnimplementedAsyncResponse() { delete request_; } @@ -310,7 +311,8 @@ class Server::UnimplementedAsyncResponse final bool FinalizeResult(void** tag, bool* status) override { if (grpc::internal::CallOpSet< grpc::internal::CallOpSendInitialMetadata, - grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) { + grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag, + status)) { delete this; } else { // The tag was swallowed due to interception. We will see it again. @@ -328,9 +330,10 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { : method_(method), method_tag_(method_tag), in_flight_(false), - has_request_payload_( - method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || - method->method_type() == grpc::internal::RpcMethod::SERVER_STREAMING), + has_request_payload_(method->method_type() == + grpc::internal::RpcMethod::NORMAL_RPC || + method->method_type() == + grpc::internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -441,7 +444,8 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { interceptor_methods_.SetReverse(); // Set interception point for RECV INITIAL METADATA interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + grpc::experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA); interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); if (has_request_payload_) { @@ -544,7 +548,8 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { has_request_payload_( method_ != nullptr && (method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || - method->method_type() == grpc::internal::RpcMethod::SERVER_STREAMING)), + method->method_type() == + grpc::internal::RpcMethod::SERVER_STREAMING)), cq_(server->CallbackCQ()), tag_(this) { server_->callback_reqs_outstanding_++; @@ -659,21 +664,24 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { req_->request_metadata_.count = 0; // Create a C++ Call to control the underlying core call - call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) - grpc::internal::Call(req_->call_, req_->server_, req_->cq_, - req_->server_->max_receive_message_size(), - req_->ctx_.set_server_rpc_info( - req_->method_name(), - (req_->method_ != nullptr) - ? req_->method_->method_type() - : grpc::internal::RpcMethod::BIDI_STREAMING, - req_->server_->interceptor_creators_)); + call_ = + new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call))) + grpc::internal::Call( + req_->call_, req_->server_, req_->cq_, + req_->server_->max_receive_message_size(), + req_->ctx_.set_server_rpc_info( + req_->method_name(), + (req_->method_ != nullptr) + ? req_->method_->method_type() + : grpc::internal::RpcMethod::BIDI_STREAMING, + req_->server_->interceptor_creators_)); req_->interceptor_methods_.SetCall(call_); req_->interceptor_methods_.SetReverse(); // Set interception point for RECV INITIAL METADATA req_->interceptor_methods_.AddInterceptionHookPoint( - grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + grpc::experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA); req_->interceptor_methods_.SetRecvInitialMetadata( &req_->ctx_.client_metadata_); @@ -767,8 +775,8 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { }; template <> -bool Server::CallbackRequest::FinalizeResult(void** tag, - bool* status) { +bool Server::CallbackRequest::FinalizeResult( + void** tag, bool* status) { return false; } @@ -791,7 +799,8 @@ const char* Server::CallbackRequest::method_name() const { } template <> -const char* Server::CallbackRequest::method_name() const { +const char* Server::CallbackRequest::method_name() + const { return ctx_.method().c_str(); } @@ -965,13 +974,14 @@ Server::Server( args->SetChannelArgs(&channel_args); for (size_t i = 0; i < channel_args.num_args; i++) { - if (0 == - strcmp(channel_args.args[i].key, grpc::kHealthCheckServiceInterfaceArg)) { + if (0 == strcmp(channel_args.args[i].key, + grpc::kHealthCheckServiceInterfaceArg)) { if (channel_args.args[i].value.pointer.p == nullptr) { health_check_service_disabled_ = true; } else { - health_check_service_.reset(static_cast( - channel_args.args[i].value.pointer.p)); + health_check_service_.reset( + static_cast( + channel_args.args[i].value.pointer.p)); } break; } @@ -1020,8 +1030,8 @@ std::shared_ptr Server::InProcessChannel( grpc_channel_args channel_args = args.c_channel_args(); return grpc::CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), - std::vector< - std::unique_ptr>()); + std::vector>()); } std::shared_ptr @@ -1089,8 +1099,9 @@ bool Server::RegisterService(const grpc::string* host, grpc::Service* service) { auto method_index = callback_unmatched_reqs_count_.size() - 1; // TODO(vjpai): Register these dynamically based on need for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { - callback_reqs_to_start_.push_back(new CallbackRequest( - this, method_index, method, method_registration_tag)); + callback_reqs_to_start_.push_back( + new CallbackRequest(this, method_index, method, + method_registration_tag)); } // Enqueue it so that it will be Request'ed later after all request // matchers are created at core server startup @@ -1131,8 +1142,9 @@ void Server::RegisterCallbackGenericService( auto method_index = callback_unmatched_reqs_count_.size() - 1; // TODO(vjpai): Register these dynamically based on need for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { - callback_reqs_to_start_.push_back(new CallbackRequest( - this, method_index, nullptr, nullptr)); + callback_reqs_to_start_.push_back( + new CallbackRequest(this, method_index, + nullptr, nullptr)); } } @@ -1162,8 +1174,8 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // performance. This ensures that we don't introduce thread hops // for application requests that wind up on this CQ, which is polled // in its own thread. - health_check_cq = - new grpc::ServerCompletionQueue(GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); + health_check_cq = new grpc::ServerCompletionQueue( + GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); grpc_server_register_completion_queue(server_, health_check_cq->cq(), nullptr); default_health_check_service_impl = @@ -1176,7 +1188,8 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // service to handle any unimplemented methods using the default reactor // creator if (!callback_reqs_to_start_.empty() && !has_callback_generic_service_) { - unimplemented_service_.reset(new grpc::experimental::CallbackGenericService); + unimplemented_service_.reset( + new grpc::experimental::CallbackGenericService); RegisterCallbackGenericService(unimplemented_service_.get()); } @@ -1201,7 +1214,8 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // server CQs), make sure that we have a ResourceExhausted handler // to deal with the case of thread exhaustion if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { - resource_exhausted_handler_.reset(new grpc::internal::ResourceExhaustedHandler); + resource_exhausted_handler_.reset( + new grpc::internal::ResourceExhaustedHandler); } for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { @@ -1321,7 +1335,9 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( request_->stream()->call_.PerformOps(this); } -grpc::ServerInitializer* Server::initializer() { return server_initializer_.get(); } +grpc::ServerInitializer* Server::initializer() { + return server_initializer_.get(); +} grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ From 9222d4f3c23a7067f563bc3c223bd086bb9dee4c Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Thu, 4 Apr 2019 10:03:26 -0700 Subject: [PATCH 3/5] Fix clang issues --- include/grpcpp/server_builder.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/grpcpp/server_builder.h b/include/grpcpp/server_builder.h index 182fdebeec9..0976b09bb8c 100644 --- a/include/grpcpp/server_builder.h +++ b/include/grpcpp/server_builder.h @@ -39,7 +39,7 @@ namespace grpc_impl { class Server; class ResourceQuota; -} // namespace grpc_impl +} // namespace grpc_impl namespace grpc { class AsyncGenericService; From e8c0b2433ac82f21dde122380dfb523859bacbe4 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Fri, 5 Apr 2019 10:20:13 -0700 Subject: [PATCH 4/5] Make script fixes --- BUILD.gn | 1 + CMakeLists.txt | 3 +++ Makefile | 3 +++ build.yaml | 1 + gRPC-C++.podspec | 1 + tools/doxygen/Doxyfile.c++ | 1 + tools/doxygen/Doxyfile.c++.internal | 1 + tools/run_tests/generated/sources_and_headers.json | 2 ++ 8 files changed, 13 insertions(+) diff --git a/BUILD.gn b/BUILD.gn index 8fed27e202e..3fd5154834f 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -1088,6 +1088,7 @@ config("grpc_config") { "include/grpcpp/server.h", "include/grpcpp/server_builder.h", "include/grpcpp/server_context.h", + "include/grpcpp/server_impl.h", "include/grpcpp/server_posix.h", "include/grpcpp/server_posix_impl.h", "include/grpcpp/support/async_stream.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index a84523e5c92..a53847588f6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3036,6 +3036,7 @@ foreach(_hdr include/grpcpp/server.h include/grpcpp/server_builder.h include/grpcpp/server_context.h + include/grpcpp/server_impl.h include/grpcpp/server_posix.h include/grpcpp/server_posix_impl.h include/grpcpp/support/async_stream.h @@ -3637,6 +3638,7 @@ foreach(_hdr include/grpcpp/server.h include/grpcpp/server_builder.h include/grpcpp/server_context.h + include/grpcpp/server_impl.h include/grpcpp/server_posix.h include/grpcpp/server_posix_impl.h include/grpcpp/support/async_stream.h @@ -4612,6 +4614,7 @@ foreach(_hdr include/grpcpp/server.h include/grpcpp/server_builder.h include/grpcpp/server_context.h + include/grpcpp/server_impl.h include/grpcpp/server_posix.h include/grpcpp/server_posix_impl.h include/grpcpp/support/async_stream.h diff --git a/Makefile b/Makefile index d1158bf31ba..b04ab44b33d 100644 --- a/Makefile +++ b/Makefile @@ -5367,6 +5367,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/server.h \ include/grpcpp/server_builder.h \ include/grpcpp/server_context.h \ + include/grpcpp/server_impl.h \ include/grpcpp/server_posix.h \ include/grpcpp/server_posix_impl.h \ include/grpcpp/support/async_stream.h \ @@ -5976,6 +5977,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/server.h \ include/grpcpp/server_builder.h \ include/grpcpp/server_context.h \ + include/grpcpp/server_impl.h \ include/grpcpp/server_posix.h \ include/grpcpp/server_posix_impl.h \ include/grpcpp/support/async_stream.h \ @@ -6900,6 +6902,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/server.h \ include/grpcpp/server_builder.h \ include/grpcpp/server_context.h \ + include/grpcpp/server_impl.h \ include/grpcpp/server_posix.h \ include/grpcpp/server_posix_impl.h \ include/grpcpp/support/async_stream.h \ diff --git a/build.yaml b/build.yaml index 888b613dc0e..42ae58793e1 100644 --- a/build.yaml +++ b/build.yaml @@ -1382,6 +1382,7 @@ filegroups: - include/grpcpp/server.h - include/grpcpp/server_builder.h - include/grpcpp/server_context.h + - include/grpcpp/server_impl.h - include/grpcpp/server_posix.h - include/grpcpp/server_posix_impl.h - include/grpcpp/support/async_stream.h diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 051397f1e67..2d2ee28850d 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -121,6 +121,7 @@ Pod::Spec.new do |s| 'include/grpcpp/server.h', 'include/grpcpp/server_builder.h', 'include/grpcpp/server_context.h', + 'include/grpcpp/server_impl.h', 'include/grpcpp/server_posix.h', 'include/grpcpp/server_posix_impl.h', 'include/grpcpp/support/async_stream.h', diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index a56d2df8530..114d5e6dfa1 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -1011,6 +1011,7 @@ include/grpcpp/security/server_credentials_impl.h \ include/grpcpp/server.h \ include/grpcpp/server_builder.h \ include/grpcpp/server_context.h \ +include/grpcpp/server_impl.h \ include/grpcpp/server_posix.h \ include/grpcpp/server_posix_impl.h \ include/grpcpp/support/async_stream.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 6bf498307bb..4f6bd406bdf 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1013,6 +1013,7 @@ include/grpcpp/security/server_credentials_impl.h \ include/grpcpp/server.h \ include/grpcpp/server_builder.h \ include/grpcpp/server_context.h \ +include/grpcpp/server_impl.h \ include/grpcpp/server_posix.h \ include/grpcpp/server_posix_impl.h \ include/grpcpp/support/async_stream.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 71e2b0d2207..898f96ad4b9 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -10131,6 +10131,7 @@ "include/grpcpp/server.h", "include/grpcpp/server_builder.h", "include/grpcpp/server_context.h", + "include/grpcpp/server_impl.h", "include/grpcpp/server_posix.h", "include/grpcpp/server_posix_impl.h", "include/grpcpp/support/async_stream.h", @@ -10250,6 +10251,7 @@ "include/grpcpp/server.h", "include/grpcpp/server_builder.h", "include/grpcpp/server_context.h", + "include/grpcpp/server_impl.h", "include/grpcpp/server_posix.h", "include/grpcpp/server_posix_impl.h", "include/grpcpp/support/async_stream.h", From 991409798c2c8e3d8d3e742e5fda27d571427590 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Mon, 8 Apr 2019 13:09:41 -0700 Subject: [PATCH 5/5] Fix clang script errors --- include/grpcpp/server_builder_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/grpcpp/server_builder_impl.h b/include/grpcpp/server_builder_impl.h index 3a1a8da879f..f7624deba5f 100644 --- a/include/grpcpp/server_builder_impl.h +++ b/include/grpcpp/server_builder_impl.h @@ -31,8 +31,8 @@ #include #include #include -#include #include +#include struct grpc_resource_quota;