diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index df68cf31441..885bd8de8d7 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -248,22 +249,15 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { /// the \a sync_server_cqs) std::vector> sync_req_mgrs_; - // Outstanding callback requests. The vector is indexed by method with a list - // per method. Each element should store its own iterator in the list and - // should erase it when the request is actually bound to an RPC. Synchronize - // this list with its own mu_ (not the server mu_) since these must be active - // at Shutdown when the server mu_ is locked. - // TODO(vjpai): Merge with the core request matcher to avoid duplicate work - struct MethodReqList { - std::mutex reqs_mu; - // Maintain our own list size count since list::size is still linear - // for some libraries (supposed to be constant since C++11) - // TODO(vjpai): Remove reqs_list_sz and use list::size when possible - size_t reqs_list_sz{0}; - std::list reqs_list; - using iterator = decltype(reqs_list)::iterator; - }; - std::vector callback_reqs_; + // Outstanding unmatched callback requests, indexed by method. + // NOTE: Using a gpr_atm rather than atomic_int because atomic_int isn't + // copyable or movable and thus will cause compilation errors. We + // actually only want to extend the vector before the threaded use + // starts, but this is still a limitation. + std::vector callback_unmatched_reqs_count_; + + // List of callback requests to start when server actually starts. + std::list callback_reqs_to_start_; // Server status std::mutex mu_; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 1e642681467..21f84de5c1e 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -350,10 +350,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { class Server::CallbackRequest final : public internal::CompletionQueueTag { public: - CallbackRequest(Server* server, Server::MethodReqList* list, + CallbackRequest(Server* server, size_t method_idx, internal::RpcServiceMethod* method, void* method_tag) : server_(server), - req_list_(list), + method_index_(method_idx), method_(method), method_tag_(method_tag), has_request_payload_( @@ -428,46 +428,31 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); GPR_ASSERT(ignored == req_); - bool spawn_new = false; - { - std::unique_lock l(req_->req_list_->reqs_mu); - req_->req_list_->reqs_list.erase(req_->req_list_iterator_); - req_->req_list_->reqs_list_sz--; - if (!ok) { - // The call has been shutdown. - // Delete its contents to free up the request. - // First release the lock in case the deletion of the request - // completes the full server shutdown and allows the destructor - // of the req_list to proceed. - l.unlock(); - delete req_; - return; - } - - // If this was the last request in the list or it is below the soft - // minimum and there are spare requests available, set up a new one, but - // do it outside the lock since the Request could otherwise deadlock - if (req_->req_list_->reqs_list_sz == 0 || - (req_->req_list_->reqs_list_sz < - SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD && - req_->server_->callback_reqs_outstanding_ < - SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) { - spawn_new = true; - } + int count = + static_cast(gpr_atm_no_barrier_fetch_add( + &req_->server_ + ->callback_unmatched_reqs_count_[req_->method_index_], + -1)) - + 1; + if (!ok) { + // The call has been shutdown. + // Delete its contents to free up the request. + delete req_; + return; } - if (spawn_new) { - auto* new_req = new CallbackRequest(req_->server_, req_->req_list_, + + // If this was the last request in the list or it is below the soft + // minimum and there are spare requests available, set up a new one. + if (count == 0 || (count < SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD && + count < SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) { + auto* new_req = new CallbackRequest(req_->server_, req_->method_index_, req_->method_, req_->method_tag_); if (!new_req->Request()) { - // The server must have just decided to shutdown. Erase - // from the list under lock but release the lock before - // deleting the new_req (in case that request was what - // would allow the destruction of the req_list) - { - std::lock_guard l(new_req->req_list_->reqs_mu); - new_req->req_list_->reqs_list.erase(new_req->req_list_iterator_); - new_req->req_list_->reqs_list_sz--; - } + // The server must have just decided to shutdown. + gpr_atm_no_barrier_fetch_add( + &new_req->server_ + ->callback_unmatched_reqs_count_[new_req->method_index_], + -1); delete new_req; } } @@ -557,20 +542,17 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { } void Setup() { + gpr_atm_no_barrier_fetch_add( + &server_->callback_unmatched_reqs_count_[method_index_], 1); grpc_metadata_array_init(&request_metadata_); ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); request_payload_ = nullptr; request_ = nullptr; request_status_ = Status(); - std::lock_guard l(req_list_->reqs_mu); - req_list_->reqs_list.push_front(this); - req_list_->reqs_list_sz++; - req_list_iterator_ = req_list_->reqs_list.begin(); } Server* const server_; - Server::MethodReqList* req_list_; - Server::MethodReqList::iterator req_list_iterator_; + size_t method_index_; internal::RpcServiceMethod* const method_; void* const method_tag_; const bool has_request_payload_; @@ -791,12 +773,12 @@ Server::~Server() { } grpc_server_destroy(server_); - for (auto* method_list : callback_reqs_) { - // The entries of the method_list should have already been emptied - // during Shutdown as each request is failed by Shutdown. Check that - // this actually happened. - GPR_ASSERT(method_list->reqs_list.empty()); - delete method_list; + for (auto& per_method_count : callback_unmatched_reqs_count_) { + // There should be no more unmatched callbacks for any method + // as each request is failed by Shutdown. Check that this actually + // happened + GPR_ASSERT(static_cast(gpr_atm_no_barrier_load(&per_method_count)) == + 0); } } @@ -852,6 +834,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { } const char* method_name = nullptr; + for (auto it = service->methods_.begin(); it != service->methods_.end(); ++it) { if (it->get() == nullptr) { // Handled by generic service if any. @@ -877,15 +860,15 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { } } else { // a callback method. Register at least some callback requests - callback_reqs_.push_back(new Server::MethodReqList); - auto* method_req_list = callback_reqs_.back(); + callback_unmatched_reqs_count_.push_back(0); + auto method_index = callback_unmatched_reqs_count_.size() - 1; // TODO(vjpai): Register these dynamically based on need for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { - new CallbackRequest(this, method_req_list, method, - method_registration_tag); + callback_reqs_to_start_.push_back(new CallbackRequest( + this, method_index, method, method_registration_tag)); } - // Enqueue it so that it will be Request'ed later once - // all request matchers are created at core server startup + // Enqueue it so that it will be Request'ed later after all request + // matchers are created at core server startup } method_name = method->name(); @@ -974,11 +957,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { (*it)->Start(); } - for (auto* cbmethods : callback_reqs_) { - for (auto* cbreq : cbmethods->reqs_list) { - GPR_ASSERT(cbreq->Request()); - } + for (auto* cbreq : callback_reqs_to_start_) { + GPR_ASSERT(cbreq->Request()); } + callback_reqs_to_start_.clear(); if (default_health_check_service_impl != nullptr) { default_health_check_service_impl->StartServingThread();