diff --git a/include/grpcpp/server_impl.h b/include/grpcpp/server_impl.h index 8572c74b075..5cbfee436de 100644 --- a/include/grpcpp/server_impl.h +++ b/include/grpcpp/server_impl.h @@ -291,6 +291,13 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { grpc_impl::ServerInitializer* initializer(); + // Functions to manage the server shutdown ref count. Things that increase + // the ref count are the running state of the server (take a ref at start and + // drop it at shutdown) and each running callback RPC. + void Ref(); + void UnrefWithPossibleNotify() /* LOCKS_EXCLUDED(mu_) */; + void UnrefAndWaitLocked() /* EXCLUSIVE_LOCKS_REQUIRED(mu_) */; + std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> acceptors_; @@ -315,16 +322,6 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { /// the \a sync_server_cqs) std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; - // Outstanding unmatched callback requests, indexed by method. - // NOTE: Using a gpr_atm rather than atomic_int because atomic_int isn't - // copyable or movable and thus will cause compilation errors. We - // actually only want to extend the vector before the threaded use - // starts, but this is still a limitation. - std::vector<gpr_atm> callback_unmatched_reqs_count_; - - // List of callback requests to start when server actually starts. - std::list<CallbackRequestBase*> callback_reqs_to_start_; - #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL // For registering experimental callback generic service; remove when that // method longer experimental @@ -336,25 +333,18 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { bool started_; bool shutdown_; bool shutdown_notified_; // Was notify called on the shutdown_cv_ + grpc::internal::CondVar shutdown_done_cv_; + bool shutdown_done_ = false; + std::atomic_int shutdown_refs_outstanding_{1}; grpc::internal::CondVar shutdown_cv_; - // It is ok (but not required) to nest callback_reqs_mu_ under mu_ . - // Incrementing callback_reqs_outstanding_ is ok without a lock but it must be - // decremented under the lock in case it is the last request and enables the - // server shutdown. The increment is performance-critical since it happens - // during periods of increasing load; the decrement happens only when memory - // is maxed out, during server shutdown, or (possibly in a future version) - // during decreasing load, so it is less performance-critical. - grpc::internal::Mutex callback_reqs_mu_; - grpc::internal::CondVar callback_reqs_done_cv_; - std::atomic<intptr_t> callback_reqs_outstanding_{0}; - std::shared_ptr<GlobalCallbacks> global_callbacks_; std::vector<grpc::string> services_; - bool has_async_generic_service_{false}; - bool has_callback_generic_service_{false}; + bool has_async_generic_service_ = false; + bool has_callback_generic_service_ = false; + bool has_callback_methods_ = false; // Pointer to the wrapped grpc_server. grpc_server* server_; @@ -383,8 +373,7 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { // with this server (if any). It is set on the first call to CallbackCQ(). // It is _not owned_ by the server; ownership belongs with its internal // shutdown callback tag (invoked when the CQ is fully shutdown). - // It is protected by mu_ - CompletionQueue* callback_cq_ = nullptr; + CompletionQueue* callback_cq_ /* GUARDED_BY(mu_) */ = nullptr; // List of CQs passed in by user that must be Shutdown only after Server is // Shutdown. Even though this is only used with NDEBUG, instantiate it in all diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 998eb6dd4a9..e03bc6f1b5f 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -72,15 +72,39 @@ enum requested_call_type { BATCH_CALL, REGISTERED_CALL }; struct registered_method; struct requested_call { - grpc_core::ManualConstructor< - grpc_core::MultiProducerSingleConsumerQueue::Node> - mpscq_node; - requested_call_type type; - void* tag; - grpc_completion_queue* cq_bound_to_call; - grpc_call** call; + requested_call(void* tag_arg, grpc_completion_queue* call_cq, + grpc_call** call_arg, grpc_metadata_array* initial_md, + grpc_call_details* details) + : type(BATCH_CALL), + tag(tag_arg), + cq_bound_to_call(call_cq), + call(call_arg), + initial_metadata(initial_md) { + details->reserved = nullptr; + data.batch.details = details; + } + + requested_call(void* tag_arg, grpc_completion_queue* call_cq, + grpc_call** call_arg, grpc_metadata_array* initial_md, + registered_method* rm, gpr_timespec* deadline, + grpc_byte_buffer** optional_payload) + : type(REGISTERED_CALL), + tag(tag_arg), + cq_bound_to_call(call_cq), + call(call_arg), + initial_metadata(initial_md) { + data.registered.method = rm; + data.registered.deadline = deadline; + data.registered.optional_payload = optional_payload; + } + + grpc_core::MultiProducerSingleConsumerQueue::Node mpscq_node; + const requested_call_type type; + void* const tag; + grpc_completion_queue* const cq_bound_to_call; + grpc_call** const call; grpc_cq_completion completion; - grpc_metadata_array* initial_metadata; + grpc_metadata_array* const initial_metadata; union { struct { grpc_call_details* details; @@ -134,6 +158,10 @@ enum call_state { struct call_data; +grpc_call_error ValidateServerRequest( + grpc_completion_queue* cq_for_notification, void* tag, + grpc_byte_buffer** optional_payload, registered_method* rm); + // RPCs that come in from the transport must be matched against RPC requests // from the application. An incoming request from the application can be matched // to an RPC that has already arrived or can be queued up for later use. @@ -242,14 +270,26 @@ struct call_data { }; struct registered_method { - char* method; - char* host; - grpc_server_register_method_payload_handling payload_handling; - uint32_t flags; + registered_method( + const char* method_arg, const char* host_arg, + grpc_server_register_method_payload_handling payload_handling_arg, + uint32_t flags_arg) + : method(gpr_strdup(method_arg)), + host(gpr_strdup(host_arg)), + payload_handling(payload_handling_arg), + flags(flags_arg) {} + + ~registered_method() { + gpr_free(method); + gpr_free(host); + } + + char* const method; + char* const host; + const grpc_server_register_method_payload_handling payload_handling; + const uint32_t flags; /* one request matcher per method */ - // TODO(vjpai): Move this to a unique_ptr once this has a real - // constructor/destructor - RequestMatcherInterface* matcher = nullptr; + std::unique_ptr<RequestMatcherInterface> matcher; registered_method* next; }; @@ -285,6 +325,8 @@ struct grpc_server { bool starting; gpr_cv starting_cv; + // TODO(vjpai): Convert from a linked-list head pointer to a std::vector once + // grpc_server has a real constructor/destructor registered_method* registered_methods; /** one request matcher for unregistered methods */ // TODO(vjpai): Convert to a std::unique_ptr once grpc_server has a real @@ -444,7 +486,7 @@ class RealRequestMatcher : public RequestMatcherInterface { void RequestCallWithPossiblePublish(size_t request_queue_index, requested_call* call) override { - if (requests_per_cq_[request_queue_index].Push(call->mpscq_node.get())) { + if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server_->mu_call); @@ -530,6 +572,103 @@ class RealRequestMatcher : public RequestMatcherInterface { std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_; }; +// AllocatingRequestMatchers don't allow the application to request an RPC in +// advance or queue up any incoming RPC for later match. Instead, MatchOrQueue +// will call out to an allocation function passed in at the construction of the +// object. These request matchers are designed for the C++ callback API, so they +// only support 1 completion queue (passed in at the constructor). +class AllocatingRequestMatcherBase : public RequestMatcherInterface { + public: + AllocatingRequestMatcherBase(grpc_server* server, grpc_completion_queue* cq) + : server_(server), cq_(cq) { + size_t idx; + for (idx = 0; idx < server->cq_count; idx++) { + if (server->cqs[idx] == cq) { + break; + } + } + GPR_ASSERT(idx < server->cq_count); + cq_idx_ = idx; + } + + void ZombifyPending() override {} + + void KillRequests(grpc_error* error) override { GRPC_ERROR_UNREF(error); } + + size_t request_queue_count() const override { return 0; } + + void RequestCallWithPossiblePublish(size_t /*request_queue_index*/, + requested_call* /*call*/) final { + GPR_ASSERT(false); + } + + grpc_server* server() const override { return server_; } + + // Supply the completion queue related to this request matcher + grpc_completion_queue* cq() const { return cq_; } + + // Supply the completion queue's index relative to the server. + size_t cq_idx() const { return cq_idx_; } + + private: + grpc_server* const server_; + grpc_completion_queue* const cq_; + size_t cq_idx_; +}; + +// An allocating request matcher for non-registered methods (used for generic +// API and unimplemented RPCs). +class AllocatingRequestMatcherBatch : public AllocatingRequestMatcherBase { + public: + AllocatingRequestMatcherBatch( + grpc_server* server, grpc_completion_queue* cq, + std::function<grpc_core::ServerBatchCallAllocation()> allocator) + : AllocatingRequestMatcherBase(server, cq), + allocator_(std::move(allocator)) {} + void MatchOrQueue(size_t /*start_request_queue_index*/, + call_data* calld) override { + grpc_core::ServerBatchCallAllocation call_info = allocator_(); + GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag), + nullptr, nullptr) == GRPC_CALL_OK); + requested_call* rc = new requested_call( + static_cast<void*>(call_info.tag), cq(), call_info.call, + call_info.initial_metadata, call_info.details); + gpr_atm_no_barrier_store(&calld->state, ACTIVATED); + publish_call(server(), calld, cq_idx(), rc); + } + + private: + std::function<grpc_core::ServerBatchCallAllocation()> allocator_; +}; + +// An allocating request matcher for registered methods. +class AllocatingRequestMatcherRegistered : public AllocatingRequestMatcherBase { + public: + AllocatingRequestMatcherRegistered( + grpc_server* server, grpc_completion_queue* cq, registered_method* rm, + std::function<grpc_core::ServerRegisteredCallAllocation()> allocator) + : AllocatingRequestMatcherBase(server, cq), + registered_method_(rm), + allocator_(std::move(allocator)) {} + void MatchOrQueue(size_t /*start_request_queue_index*/, + call_data* calld) override { + grpc_core::ServerRegisteredCallAllocation call_info = allocator_(); + GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag), + call_info.optional_payload, + registered_method_) == GRPC_CALL_OK); + requested_call* rc = new requested_call( + static_cast<void*>(call_info.tag), cq(), call_info.call, + call_info.initial_metadata, registered_method_, call_info.deadline, + call_info.optional_payload); + gpr_atm_no_barrier_store(&calld->state, ACTIVATED); + publish_call(server(), calld, cq_idx(), rc); + } + + private: + registered_method* const registered_method_; + std::function<grpc_core::ServerRegisteredCallAllocation()> allocator_; +}; + /* * server proper */ @@ -546,10 +685,7 @@ void server_delete(grpc_server* server) { gpr_cv_destroy(&server->starting_cv); while ((rm = server->registered_methods) != nullptr) { server->registered_methods = rm->next; - delete rm->matcher; - gpr_free(rm->method); - gpr_free(rm->host); - gpr_free(rm); + delete rm; } delete server->unregistered_request_matcher; for (i = 0; i < server->cq_count; i++) { @@ -603,7 +739,9 @@ void destroy_channel(channel_data* chand) { op); } -void done_request_event(void* req, grpc_cq_completion* /*c*/) { gpr_free(req); } +void done_request_event(void* req, grpc_cq_completion* /*c*/) { + delete static_cast<requested_call*>(req); +} void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, requested_call* rc) { @@ -718,7 +856,8 @@ void start_new_rpc(grpc_call_element* elem) { GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } - finish_start_new_rpc(server, elem, rm->server_registered_method->matcher, + finish_start_new_rpc(server, elem, + rm->server_registered_method->matcher.get(), rm->server_registered_method->payload_handling); return; } @@ -735,7 +874,8 @@ void start_new_rpc(grpc_call_element* elem) { GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } - finish_start_new_rpc(server, elem, rm->server_registered_method->matcher, + finish_start_new_rpc(server, elem, + rm->server_registered_method->matcher.get(), rm->server_registered_method->payload_handling); return; } @@ -1101,7 +1241,7 @@ grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, rm = server->unregistered_request_matcher; break; case REGISTERED_CALL: - rm = rc->data.registered.method->matcher; + rm = rc->data.registered.method->matcher.get(); break; } rm->RequestCallWithPossiblePublish(cq_idx, rc); @@ -1119,6 +1259,26 @@ void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc, } } // namespace +namespace grpc_core { + +void SetServerRegisteredMethodAllocator( + grpc_server* server, grpc_completion_queue* cq, void* method_tag, + std::function<ServerRegisteredCallAllocation()> allocator) { + registered_method* rm = static_cast<registered_method*>(method_tag); + rm->matcher.reset(new AllocatingRequestMatcherRegistered( + server, cq, rm, std::move(allocator))); +} + +void SetServerBatchMethodAllocator( + grpc_server* server, grpc_completion_queue* cq, + std::function<ServerBatchCallAllocation()> allocator) { + GPR_DEBUG_ASSERT(server->unregistered_request_matcher == nullptr); + server->unregistered_request_matcher = + new AllocatingRequestMatcherBatch(server, cq, std::move(allocator)); +} + +}; // namespace grpc_core + const grpc_channel_filter grpc_server_top_filter = { server_start_transport_stream_op_batch, grpc_channel_next_op, @@ -1224,12 +1384,8 @@ void* grpc_server_register_method( flags); return nullptr; } - m = static_cast<registered_method*>(gpr_zalloc(sizeof(registered_method))); - m->method = gpr_strdup(method); - m->host = gpr_strdup(host); + m = new registered_method(method, host, payload_handling, flags); m->next = server->registered_methods; - m->payload_handling = payload_handling; - m->flags = flags; server->registered_methods = m; return m; } @@ -1250,9 +1406,13 @@ void grpc_server_start(grpc_server* server) { grpc_cq_pollset(server->cqs[i]); } } - server->unregistered_request_matcher = new RealRequestMatcher(server); + if (server->unregistered_request_matcher == nullptr) { + server->unregistered_request_matcher = new RealRequestMatcher(server); + } for (registered_method* rm = server->registered_methods; rm; rm = rm->next) { - rm->matcher = new RealRequestMatcher(server); + if (rm->matcher == nullptr) { + rm->matcher.reset(new RealRequestMatcher(server)); + } } gpr_mu_lock(&server->mu_global); @@ -1523,15 +1683,51 @@ void grpc_server_add_listener( server->listeners = l; } +namespace { +grpc_call_error ValidateServerRequest( + grpc_completion_queue* cq_for_notification, void* tag, + grpc_byte_buffer** optional_payload, registered_method* rm) { + if ((rm == nullptr && optional_payload != nullptr) || + ((rm != nullptr) && ((optional_payload == nullptr) != + (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) { + return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; + } + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { + return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + } + return GRPC_CALL_OK; +} +grpc_call_error ValidateServerRequestAndCq( + size_t* cq_idx, grpc_server* server, + grpc_completion_queue* cq_for_notification, void* tag, + grpc_byte_buffer** optional_payload, registered_method* rm) { + size_t idx; + for (idx = 0; idx < server->cq_count; idx++) { + if (server->cqs[idx] == cq_for_notification) { + break; + } + } + if (idx == server->cq_count) { + return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + } + grpc_call_error error = + ValidateServerRequest(cq_for_notification, tag, optional_payload, rm); + if (error != GRPC_CALL_OK) { + return error; + } + + *cq_idx = idx; + return GRPC_CALL_OK; +} +} // namespace + grpc_call_error grpc_server_request_call( grpc_server* server, grpc_call** call, grpc_call_details* details, grpc_metadata_array* initial_metadata, grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag) { - grpc_call_error error; grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc))); GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); GRPC_API_TRACE( "grpc_server_request_call(" @@ -1540,33 +1736,17 @@ grpc_call_error grpc_server_request_call( 7, (server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag)); + size_t cq_idx; - for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) { - if (server->cqs[cq_idx] == cq_for_notification) { - break; - } - } - if (cq_idx == server->cq_count) { - gpr_free(rc); - error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; - goto done; + grpc_call_error error = ValidateServerRequestAndCq( + &cq_idx, server, cq_for_notification, tag, nullptr, nullptr); + if (error != GRPC_CALL_OK) { + return error; } - if (grpc_cq_begin_op(cq_for_notification, tag) == false) { - gpr_free(rc); - error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; - goto done; - } - details->reserved = nullptr; - rc->type = BATCH_CALL; - rc->tag = tag; - rc->cq_bound_to_call = cq_bound_to_call; - rc->call = call; - rc->data.batch.details = details; - rc->initial_metadata = initial_metadata; - error = queue_call_request(server, cq_idx, rc); -done: - - return error; + + requested_call* rc = new requested_call(tag, cq_bound_to_call, call, + initial_metadata, details); + return queue_call_request(server, cq_idx, rc); } grpc_call_error grpc_server_request_registered_call( @@ -1577,7 +1757,6 @@ grpc_call_error grpc_server_request_registered_call( grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); - requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc))); registered_method* rm = static_cast<registered_method*>(rmp); GRPC_API_TRACE( "grpc_server_request_registered_call(" @@ -1589,33 +1768,15 @@ grpc_call_error grpc_server_request_registered_call( cq_bound_to_call, cq_for_notification, tag)); size_t cq_idx; - for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) { - if (server->cqs[cq_idx] == cq_for_notification) { - break; - } - } - if (cq_idx == server->cq_count) { - gpr_free(rc); - return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; - } - if ((optional_payload == nullptr) != - (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) { - gpr_free(rc); - return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; + grpc_call_error error = ValidateServerRequestAndCq( + &cq_idx, server, cq_for_notification, tag, optional_payload, rm); + if (error != GRPC_CALL_OK) { + return error; } - if (grpc_cq_begin_op(cq_for_notification, tag) == false) { - gpr_free(rc); - return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; - } - rc->type = REGISTERED_CALL; - rc->tag = tag; - rc->cq_bound_to_call = cq_bound_to_call; - rc->call = call; - rc->data.registered.method = rm; - rc->data.registered.deadline = deadline; - rc->initial_metadata = initial_metadata; - rc->data.registered.optional_payload = optional_payload; + requested_call* rc = + new requested_call(tag, cq_bound_to_call, call, initial_metadata, rm, + deadline, optional_payload); return queue_call_request(server, cq_idx, rc); } diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 2285821e11b..3f11c83caa9 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -64,4 +64,36 @@ int grpc_server_has_open_connections(grpc_server* server); void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets, size_t* pollset_count); +namespace grpc_core { + +// An object to represent the most relevant characteristics of a newly-allocated +// call object when using an AllocatingRequestMatcherBatch +struct ServerBatchCallAllocation { + grpc_experimental_completion_queue_functor* tag; + grpc_call** call; + grpc_metadata_array* initial_metadata; + grpc_call_details* details; +}; + +// An object to represent the most relevant characteristics of a newly-allocated +// call object when using an AllocatingRequestMatcherRegistered +struct ServerRegisteredCallAllocation { + grpc_experimental_completion_queue_functor* tag; + grpc_call** call; + grpc_metadata_array* initial_metadata; + gpr_timespec* deadline; + grpc_byte_buffer** optional_payload; +}; + +// Functions to specify that a specific registered method or the unregistered +// collection should use a specific allocator for request matching. +void SetServerRegisteredMethodAllocator( + grpc_server* server, grpc_completion_queue* cq, void* method_tag, + std::function<ServerRegisteredCallAllocation()> allocator); +void SetServerBatchMethodAllocator( + grpc_server* server, grpc_completion_queue* cq, + std::function<ServerBatchCallAllocation()> allocator); + +} // namespace grpc_core + #endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */ diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index a30e4911dc0..2f6c2700076 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -47,6 +47,7 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/server.h" #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/server/external_connection_acceptor_impl.h" #include "src/cpp/server/health/default_health_check_service.h" @@ -62,17 +63,6 @@ namespace { // max-threads set) to the server builder. #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX -// How many callback requests of each method should we pre-register at start -#define DEFAULT_CALLBACK_REQS_PER_METHOD 512 - -// What is the (soft) limit for outstanding requests in the server -#define SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING 30000 - -// If the number of unmatched requests for a method drops below this amount, try -// to allocate extra unless it pushes the total number of callbacks above the -// soft maximum -#define SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD 128 - class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} @@ -544,74 +534,61 @@ class Server::SyncRequest final : public grpc::internal::CompletionQueueTag { grpc_completion_queue* cq_; }; -class Server::CallbackRequestBase : public grpc::internal::CompletionQueueTag { - public: - virtual ~CallbackRequestBase() {} - virtual bool Request() = 0; -}; - template <class ServerContextType> -class Server::CallbackRequest final : public Server::CallbackRequestBase { +class Server::CallbackRequest final + : public grpc::internal::CompletionQueueTag { public: static_assert( std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value, "ServerContextType must be derived from CallbackServerContext"); - // The constructor needs to know the server for this callback request and its - // index in the server's request count array to allow for proper dynamic - // requesting of incoming RPCs. For codegen services, the values of method and - // method_tag represent the defined characteristics of the method being - // requested. For generic services, method and method_tag are nullptr since - // these services don't have pre-defined methods or method registration tags. - CallbackRequest(Server* server, size_t method_idx, - grpc::internal::RpcServiceMethod* method, void* method_tag) + // For codegen services, the value of method represents the defined + // characteristics of the method being requested. For generic services, method + // is nullptr since these services don't have pre-defined methods. + CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method, + grpc::CompletionQueue* cq, + grpc_core::ServerRegisteredCallAllocation* data) : server_(server), - method_index_(method_idx), method_(method), - method_tag_(method_tag), - has_request_payload_( - method_ != nullptr && - (method->method_type() == grpc::internal::RpcMethod::NORMAL_RPC || - method->method_type() == - grpc::internal::RpcMethod::SERVER_STREAMING)), - cq_(server->CallbackCQ()), + has_request_payload_(method->method_type() == + grpc::internal::RpcMethod::NORMAL_RPC || + method->method_type() == + grpc::internal::RpcMethod::SERVER_STREAMING), + cq_(cq), tag_(this) { - server_->callback_reqs_outstanding_++; + server->Ref(); Setup(); + data->tag = &tag_; + data->call = &call_; + data->initial_metadata = &request_metadata_; + data->deadline = &deadline_; + data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr; } - ~CallbackRequest() { - Clear(); - - // The counter of outstanding requests must be decremented - // under a lock in case it causes the server shutdown. - grpc::internal::MutexLock l(&server_->callback_reqs_mu_); - if (--server_->callback_reqs_outstanding_ == 0) { - server_->callback_reqs_done_cv_.Signal(); + // For generic services, method is nullptr since these services don't have + // pre-defined methods. + CallbackRequest(Server* server, grpc::CompletionQueue* cq, + grpc_core::ServerBatchCallAllocation* data) + : server_(server), + method_(nullptr), + has_request_payload_(false), + cq_(cq), + tag_(this) { + server->Ref(); + Setup(); + data->tag = &tag_; + data->call = &call_; + data->initial_metadata = &request_metadata_; + if (!call_details_) { + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); } + data->details = call_details_; } - bool Request() override { - if (method_tag_) { - if (grpc_server_request_registered_call( - server_->c_server(), method_tag_, &call_, &deadline_, - &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(), - cq_->cq(), static_cast<void*>(&tag_)) != GRPC_CALL_OK) { - return false; - } - } else { - if (!call_details_) { - call_details_ = new grpc_call_details; - grpc_call_details_init(call_details_); - } - if (grpc_server_request_call(server_->c_server(), &call_, call_details_, - &request_metadata_, cq_->cq(), cq_->cq(), - static_cast<void*>(&tag_)) != GRPC_CALL_OK) { - return false; - } - } - return true; + ~CallbackRequest() { + Clear(); + server_->UnrefWithPossibleNotify(); } // Needs specialization to account for different processing of metadata @@ -655,12 +632,6 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); GPR_ASSERT(ignored == req_); - int count = - static_cast<int>(gpr_atm_no_barrier_fetch_add( - &req_->server_ - ->callback_unmatched_reqs_count_[req_->method_index_], - -1)) - - 1; if (!ok) { // The call has been shutdown. // Delete its contents to free up the request. @@ -668,24 +639,6 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { return; } - // If this was the last request in the list or it is below the soft - // minimum and there are spare requests available, set up a new one. - if (count == 0 || (count < SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD && - req_->server_->callback_reqs_outstanding_ < - SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) { - auto* new_req = new CallbackRequest<ServerContextType>( - req_->server_, req_->method_index_, req_->method_, - req_->method_tag_); - if (!new_req->Request()) { - // The server must have just decided to shutdown. - gpr_atm_no_barrier_fetch_add( - &new_req->server_ - ->callback_unmatched_reqs_count_[new_req->method_index_], - -1); - delete new_req; - } - } - // Bind the call, deadline, and metadata from what we got req_->ctx_.set_call(req_->call_); req_->ctx_.cq_ = req_->cq_; @@ -740,28 +693,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { : req_->server_->generic_handler_.get(); handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, - req_->handler_data_, [this] { - // Recycle this request if there aren't too many outstanding. - // Note that we don't have to worry about a case where there - // are no requests waiting to match for this method since that - // is already taken care of when binding a request to a call. - // TODO(vjpai): Also don't recycle this request if the dynamic - // load no longer justifies it. Consider measuring - // dynamic load and setting a target accordingly. - if (req_->server_->callback_reqs_outstanding_ < - SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING) { - req_->Clear(); - req_->Setup(); - } else { - // We can free up this request because there are too many - delete req_; - return; - } - if (!req_->Request()) { - // The server must have just decided to shutdown. - delete req_; - } - })); + req_->handler_data_, [this] { delete req_; })); } }; @@ -779,8 +711,6 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { } void Setup() { - gpr_atm_no_barrier_fetch_add( - &server_->callback_unmatched_reqs_count_[method_index_], 1); grpc_metadata_array_init(&request_metadata_); ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); request_payload_ = nullptr; @@ -790,9 +720,7 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { } Server* const server_; - const size_t method_index_; grpc::internal::RpcServiceMethod* const method_; - void* const method_tag_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; void* request_; @@ -1055,13 +983,6 @@ Server::~Server() { } grpc_server_destroy(server_); - for (auto& per_method_count : callback_unmatched_reqs_count_) { - // There should be no more unmatched callbacks for any method - // as each request is failed by Shutdown. Check that this actually - // happened - GPR_ASSERT(static_cast<int>(gpr_atm_no_barrier_load(&per_method_count)) == - 0); - } } void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { @@ -1139,17 +1060,16 @@ bool Server::RegisterService(const grpc::string* host, grpc::Service* service) { value->AddSyncMethod(method.get(), method_registration_tag); } } else { - // a callback method. Register at least some callback requests - callback_unmatched_reqs_count_.push_back(0); - auto method_index = callback_unmatched_reqs_count_.size() - 1; - // TODO(vjpai): Register these dynamically based on need - for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { - callback_reqs_to_start_.push_back( - new CallbackRequest<grpc::CallbackServerContext>( - this, method_index, method.get(), method_registration_tag)); - } - // Enqueue it so that it will be Request'ed later after all request - // matchers are created at core server startup + has_callback_methods_ = true; + grpc::internal::RpcServiceMethod* method_value = method.get(); + grpc::CompletionQueue* cq = CallbackCQ(); + grpc_core::SetServerRegisteredMethodAllocator( + server_, cq->cq(), method_registration_tag, [this, cq, method_value] { + grpc_core::ServerRegisteredCallAllocation result; + new CallbackRequest<grpc::CallbackServerContext>(this, method_value, + cq, &result); + return result; + }); } method_name = method->name(); @@ -1183,14 +1103,12 @@ void Server::RegisterCallbackGenericService( has_callback_generic_service_ = true; generic_handler_.reset(service->Handler()); - callback_unmatched_reqs_count_.push_back(0); - auto method_index = callback_unmatched_reqs_count_.size() - 1; - // TODO(vjpai): Register these dynamically based on need - for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { - callback_reqs_to_start_.push_back( - new CallbackRequest<grpc::GenericCallbackServerContext>( - this, method_index, nullptr, nullptr)); - } + grpc::CompletionQueue* cq = CallbackCQ(); + grpc_core::SetServerBatchMethodAllocator(server_, cq->cq(), [this, cq] { + grpc_core::ServerBatchCallAllocation result; + new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result); + return result; + }); } int Server::AddListeningPort(const grpc::string& addr, @@ -1201,6 +1119,31 @@ int Server::AddListeningPort(const grpc::string& addr, return port; } +void Server::Ref() { + shutdown_refs_outstanding_.fetch_add(1, std::memory_order_relaxed); +} + +void Server::UnrefWithPossibleNotify() { + if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + // No refs outstanding means that shutdown has been initiated and no more + // callback requests are outstanding. + grpc::internal::MutexLock lock(&mu_); + GPR_ASSERT(shutdown_); + shutdown_done_ = true; + shutdown_done_cv_.Signal(); + } +} + +void Server::UnrefAndWaitLocked() { + if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub( + 1, std::memory_order_acq_rel) == 1)) { + shutdown_done_ = true; + return; // no need to wait on CV since done condition already set + } + shutdown_done_cv_.WaitUntil(&mu_, [this] { return shutdown_done_; }); +} + void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); global_callbacks_->PreServerStart(this); @@ -1236,7 +1179,7 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // If this server uses callback methods, then create a callback generic // service to handle any unimplemented methods using the default reactor // creator - if (!callback_reqs_to_start_.empty() && !has_callback_generic_service_) { + if (has_callback_methods_ && !has_callback_generic_service_) { unimplemented_service_.reset(new grpc::CallbackGenericService); RegisterCallbackGenericService(unimplemented_service_.get()); } @@ -1276,11 +1219,6 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { value->Start(); } - for (auto* cbreq : callback_reqs_to_start_) { - GPR_ASSERT(cbreq->Request()); - } - callback_reqs_to_start_.clear(); - if (default_health_check_service_impl != nullptr) { default_health_check_service_impl->StartServingThread(); } @@ -1333,23 +1271,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) { value->Wait(); } - // Wait for all outstanding callback requests to complete - // (whether waiting for a match or already active). - // We know that no new requests will be created after this point - // because they are only created at server startup time or when - // we have a successful match on a request. During the shutdown phase, - // requests that have not yet matched will be failed rather than - // allowed to succeed, which will cause the server to delete the - // request and decrement the count. Possibly a request will match before - // the shutdown but then find that shutdown has already started by the - // time it tries to register a new request. In that case, the registration - // will report a failure, indicating a shutdown and again we won't end - // up incrementing the counter. - { - grpc::internal::MutexLock cblock(&callback_reqs_mu_); - callback_reqs_done_cv_.WaitUntil( - &callback_reqs_mu_, [this] { return callback_reqs_outstanding_ == 0; }); - } + // Drop the shutdown ref and wait for all other refs to drop as well. + UnrefAndWaitLocked(); // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it // will delete itself at true shutdown. @@ -1421,15 +1344,17 @@ grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered grpc::internal::MutexLock l(&mu_); - if (callback_cq_ == nullptr) { - auto* shutdown_callback = new grpc::ShutdownCallback; - callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, - shutdown_callback}); - - // Transfer ownership of the new cq to its own shutdown callback - shutdown_callback->TakeCQ(callback_cq_); + if (callback_cq_ != nullptr) { + return callback_cq_; } + auto* shutdown_callback = new grpc::ShutdownCallback; + callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + return callback_cq_; }