Merge pull request #17909 from vjpai/delist

Replace list of outstanding callback requests with count
pull/17916/head
Vijay Pai 6 years ago committed by GitHub
commit 977df7208a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      include/grpcpp/server.h
  2. 88
      src/cpp/server/server_cc.cc

@ -26,6 +26,7 @@
#include <vector> #include <vector>
#include <grpc/compression.h> #include <grpc/compression.h>
#include <grpc/support/atm.h>
#include <grpcpp/completion_queue.h> #include <grpcpp/completion_queue.h>
#include <grpcpp/impl/call.h> #include <grpcpp/impl/call.h>
#include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/impl/codegen/client_interceptor.h>
@ -248,22 +249,15 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
/// the \a sync_server_cqs) /// the \a sync_server_cqs)
std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
// Outstanding callback requests. The vector is indexed by method with a list // Outstanding unmatched callback requests, indexed by method.
// per method. Each element should store its own iterator in the list and // NOTE: Using a gpr_atm rather than atomic_int because atomic_int isn't
// should erase it when the request is actually bound to an RPC. Synchronize // copyable or movable and thus will cause compilation errors. We
// this list with its own mu_ (not the server mu_) since these must be active // actually only want to extend the vector before the threaded use
// at Shutdown when the server mu_ is locked. // starts, but this is still a limitation.
// TODO(vjpai): Merge with the core request matcher to avoid duplicate work std::vector<gpr_atm> callback_unmatched_reqs_count_;
struct MethodReqList {
std::mutex reqs_mu; // List of callback requests to start when server actually starts.
// Maintain our own list size count since list::size is still linear std::list<CallbackRequest*> callback_reqs_to_start_;
// for some libraries (supposed to be constant since C++11)
// TODO(vjpai): Remove reqs_list_sz and use list::size when possible
size_t reqs_list_sz{0};
std::list<CallbackRequest*> reqs_list;
using iterator = decltype(reqs_list)::iterator;
};
std::vector<MethodReqList*> callback_reqs_;
// Server status // Server status
std::mutex mu_; std::mutex mu_;

@ -350,10 +350,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
class Server::CallbackRequest final : public internal::CompletionQueueTag { class Server::CallbackRequest final : public internal::CompletionQueueTag {
public: public:
CallbackRequest(Server* server, Server::MethodReqList* list, CallbackRequest(Server* server, size_t method_idx,
internal::RpcServiceMethod* method, void* method_tag) internal::RpcServiceMethod* method, void* method_tag)
: server_(server), : server_(server),
req_list_(list), method_index_(method_idx),
method_(method), method_(method),
method_tag_(method_tag), method_tag_(method_tag),
has_request_payload_( has_request_payload_(
@ -428,46 +428,31 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
GPR_ASSERT(ignored == req_); GPR_ASSERT(ignored == req_);
bool spawn_new = false; int count =
{ static_cast<int>(gpr_atm_no_barrier_fetch_add(
std::unique_lock<std::mutex> l(req_->req_list_->reqs_mu); &req_->server_
req_->req_list_->reqs_list.erase(req_->req_list_iterator_); ->callback_unmatched_reqs_count_[req_->method_index_],
req_->req_list_->reqs_list_sz--; -1)) -
1;
if (!ok) { if (!ok) {
// The call has been shutdown. // The call has been shutdown.
// Delete its contents to free up the request. // Delete its contents to free up the request.
// First release the lock in case the deletion of the request
// completes the full server shutdown and allows the destructor
// of the req_list to proceed.
l.unlock();
delete req_; delete req_;
return; return;
} }
// If this was the last request in the list or it is below the soft // If this was the last request in the list or it is below the soft
// minimum and there are spare requests available, set up a new one, but // minimum and there are spare requests available, set up a new one.
// do it outside the lock since the Request could otherwise deadlock if (count == 0 || (count < SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD &&
if (req_->req_list_->reqs_list_sz == 0 || count < SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) {
(req_->req_list_->reqs_list_sz < auto* new_req = new CallbackRequest(req_->server_, req_->method_index_,
SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD &&
req_->server_->callback_reqs_outstanding_ <
SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) {
spawn_new = true;
}
}
if (spawn_new) {
auto* new_req = new CallbackRequest(req_->server_, req_->req_list_,
req_->method_, req_->method_tag_); req_->method_, req_->method_tag_);
if (!new_req->Request()) { if (!new_req->Request()) {
// The server must have just decided to shutdown. Erase // The server must have just decided to shutdown.
// from the list under lock but release the lock before gpr_atm_no_barrier_fetch_add(
// deleting the new_req (in case that request was what &new_req->server_
// would allow the destruction of the req_list) ->callback_unmatched_reqs_count_[new_req->method_index_],
{ -1);
std::lock_guard<std::mutex> l(new_req->req_list_->reqs_mu);
new_req->req_list_->reqs_list.erase(new_req->req_list_iterator_);
new_req->req_list_->reqs_list_sz--;
}
delete new_req; delete new_req;
} }
} }
@ -557,20 +542,17 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
} }
void Setup() { void Setup() {
gpr_atm_no_barrier_fetch_add(
&server_->callback_unmatched_reqs_count_[method_index_], 1);
grpc_metadata_array_init(&request_metadata_); grpc_metadata_array_init(&request_metadata_);
ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME));
request_payload_ = nullptr; request_payload_ = nullptr;
request_ = nullptr; request_ = nullptr;
request_status_ = Status(); request_status_ = Status();
std::lock_guard<std::mutex> l(req_list_->reqs_mu);
req_list_->reqs_list.push_front(this);
req_list_->reqs_list_sz++;
req_list_iterator_ = req_list_->reqs_list.begin();
} }
Server* const server_; Server* const server_;
Server::MethodReqList* req_list_; size_t method_index_;
Server::MethodReqList::iterator req_list_iterator_;
internal::RpcServiceMethod* const method_; internal::RpcServiceMethod* const method_;
void* const method_tag_; void* const method_tag_;
const bool has_request_payload_; const bool has_request_payload_;
@ -791,12 +773,12 @@ Server::~Server() {
} }
grpc_server_destroy(server_); grpc_server_destroy(server_);
for (auto* method_list : callback_reqs_) { for (auto& per_method_count : callback_unmatched_reqs_count_) {
// The entries of the method_list should have already been emptied // There should be no more unmatched callbacks for any method
// during Shutdown as each request is failed by Shutdown. Check that // as each request is failed by Shutdown. Check that this actually
// this actually happened. // happened
GPR_ASSERT(method_list->reqs_list.empty()); GPR_ASSERT(static_cast<int>(gpr_atm_no_barrier_load(&per_method_count)) ==
delete method_list; 0);
} }
} }
@ -852,6 +834,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
} }
const char* method_name = nullptr; const char* method_name = nullptr;
for (auto it = service->methods_.begin(); it != service->methods_.end(); for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) { ++it) {
if (it->get() == nullptr) { // Handled by generic service if any. if (it->get() == nullptr) { // Handled by generic service if any.
@ -877,15 +860,15 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
} }
} else { } else {
// a callback method. Register at least some callback requests // a callback method. Register at least some callback requests
callback_reqs_.push_back(new Server::MethodReqList); callback_unmatched_reqs_count_.push_back(0);
auto* method_req_list = callback_reqs_.back(); auto method_index = callback_unmatched_reqs_count_.size() - 1;
// TODO(vjpai): Register these dynamically based on need // TODO(vjpai): Register these dynamically based on need
for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
new CallbackRequest(this, method_req_list, method, callback_reqs_to_start_.push_back(new CallbackRequest(
method_registration_tag); this, method_index, method, method_registration_tag));
} }
// Enqueue it so that it will be Request'ed later once // Enqueue it so that it will be Request'ed later after all request
// all request matchers are created at core server startup // matchers are created at core server startup
} }
method_name = method->name(); method_name = method->name();
@ -974,11 +957,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
(*it)->Start(); (*it)->Start();
} }
for (auto* cbmethods : callback_reqs_) { for (auto* cbreq : callback_reqs_to_start_) {
for (auto* cbreq : cbmethods->reqs_list) {
GPR_ASSERT(cbreq->Request()); GPR_ASSERT(cbreq->Request());
} }
} callback_reqs_to_start_.clear();
if (default_health_check_service_impl != nullptr) { if (default_health_check_service_impl != nullptr) {
default_health_check_service_impl->StartServingThread(); default_health_check_service_impl->StartServingThread();

Loading…
Cancel
Save