diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 8d7c21107f4..79b182c4515 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -693,6 +693,10 @@ typedef struct grpc_experimental_completion_queue_functor { pointer to this functor and a boolean that indicates whether the operation succeeded (non-zero) or failed (zero) */ void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int); + + /** The following fields are not API. They are meant for internal use. */ + int internal_success; + struct grpc_experimental_completion_queue_functor* internal_next; } grpc_experimental_completion_queue_functor; /* The upgrade to version 2 is currently experimental. */ diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index cdcac186cb6..5bbbd704a02 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -248,8 +248,22 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { /// the \a sync_server_cqs) std::vector> sync_req_mgrs_; - /// Outstanding callback requests - std::vector> callback_reqs_; + // Outstanding callback requests. The vector is indexed by method with a + // list per method. Each element should store its own iterator + // in the list and should erase it when the request is actually bound to + // an RPC. Synchronize this list with its own mu_ (not the server mu_) since + // these must be active at Shutdown when the server mu_ is locked + // TODO(vjpai): Merge with the core request matcher to avoid duplicate work + struct MethodReqList { + std::mutex reqs_mu; + // Maintain our own list size count since list::size is still linear + // for some libraries (supposed to be constant since C++11) + // TODO(vjpai): Remove reqs_list_sz and use list::size when possible + size_t reqs_list_sz{0}; + std::list reqs_list; + using iterator = decltype(reqs_list)::iterator; + }; + std::vector callback_reqs_; // Server status std::mutex mu_; @@ -259,6 +273,18 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { std::condition_variable shutdown_cv_; + // It is ok (but not required) to nest callback_reqs_mu_ under mu_ . + // Incrementing callback_reqs_outstanding_ is ok without a lock + // but it should only be decremented under the lock in case it is the + // last request and enables the server shutdown. The increment is + // performance-critical since it happens during periods of increasing + // load; the decrement happens only when memory is maxed out, during server + // shutdown, or (possibly in a future version) during decreasing load, so + // it is less performance-critical. + std::mutex callback_reqs_mu_; + std::condition_variable callback_reqs_done_cv_; + std::atomic_int callback_reqs_outstanding_{0}; + std::shared_ptr global_callbacks_; std::vector services_; diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 683dd2f6493..f45def43397 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -115,6 +115,7 @@ grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler; namespace grpc_core { GPR_TLS_CLASS_DEF(ExecCtx::exec_ctx_); +GPR_TLS_CLASS_DEF(ApplicationCallbackExecCtx::callback_exec_ctx_); // WARNING: for testing purposes only! void ExecCtx::TestOnlyGlobalInit(gpr_timespec new_val) { diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index e90eb54cd35..36c1a907cbc 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -21,12 +21,14 @@ #include +#include #include #include #include #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gprpp/fork.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/closure.h" typedef int64_t grpc_millis; @@ -34,9 +36,8 @@ typedef int64_t grpc_millis; #define GRPC_MILLIS_INF_FUTURE INT64_MAX #define GRPC_MILLIS_INF_PAST INT64_MIN -/** A workqueue represents a list of work to be executed asynchronously. - Forward declared here to avoid a circular dependency with workqueue.h. */ -typedef struct grpc_workqueue grpc_workqueue; +/** A combiner represents a list of work to be executed later. + Forward declared here to avoid a circular dependency with combiner.h. */ typedef struct grpc_combiner grpc_combiner; /* This exec_ctx is ready to return: either pre-populated, or cached as soon as @@ -226,6 +227,56 @@ class ExecCtx { GPR_TLS_CLASS_DECL(exec_ctx_); ExecCtx* last_exec_ctx_ = Get(); }; + +class ApplicationCallbackExecCtx { + public: + ApplicationCallbackExecCtx() { + if (reinterpret_cast( + gpr_tls_get(&callback_exec_ctx_)) == nullptr) { + grpc_core::Fork::IncExecCtxCount(); + gpr_tls_set(&callback_exec_ctx_, reinterpret_cast(this)); + } + } + ~ApplicationCallbackExecCtx() { + if (reinterpret_cast( + gpr_tls_get(&callback_exec_ctx_)) == this) { + while (head_ != nullptr) { + auto* f = head_; + head_ = f->internal_next; + if (f->internal_next == nullptr) { + tail_ = nullptr; + } + (*f->functor_run)(f, f->internal_success); + } + gpr_tls_set(&callback_exec_ctx_, reinterpret_cast(nullptr)); + grpc_core::Fork::DecExecCtxCount(); + } else { + GPR_DEBUG_ASSERT(head_ == nullptr); + GPR_DEBUG_ASSERT(tail_ == nullptr); + } + } + static void Enqueue(grpc_experimental_completion_queue_functor* functor, + int is_success) { + functor->internal_success = is_success; + functor->internal_next = nullptr; + + auto* ctx = reinterpret_cast( + gpr_tls_get(&callback_exec_ctx_)); + + if (ctx->head_ == nullptr) { + ctx->head_ = functor; + } + if (ctx->tail_ != nullptr) { + ctx->tail_->internal_next = functor; + } + ctx->tail_ = functor; + } + + private: + grpc_experimental_completion_queue_functor* head_{nullptr}; + grpc_experimental_completion_queue_functor* tail_{nullptr}; + GPR_TLS_CLASS_DECL(callback_exec_ctx_); +}; } // namespace grpc_core #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */ diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 2703e1a0b77..34683273cf6 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -111,6 +111,13 @@ size_t Executor::RunClosures(const char* executor_name, grpc_closure_list list) { size_t n = 0; + // In the executor, the ExecCtx for the thread is declared + // in the executor thread itself, but this is the point where we + // could start seeing application-level callbacks. No need to + // create a new ExecCtx, though, since there already is one and it is + // flushed (but not destructed) in this function itself + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_closure* c = list.head; while (c != nullptr) { grpc_closure* next = c->next_data.next; diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index cb123298cf5..1da242938a2 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -105,6 +105,13 @@ void grpc_timer_manager_tick() { } static void run_some_timers() { + // In the case of timers, the ExecCtx for the thread is declared + // in the timer thread itself, but this is the point where we + // could start seeing application-level callbacks. No need to + // create a new ExecCtx, though, since there already is one and it is + // flushed (but not destructed) in this function itself + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + // if there's something to execute... gpr_mu_lock(&g_mu); // remove a waiter from the pool, and start another thread if necessary diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 89b3f77822c..d53eb704420 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -556,6 +556,7 @@ void grpc_call_unref(grpc_call* c) { GPR_TIMER_SCOPE("grpc_call_unref", 0); child_call* cc = c->child; + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c)); @@ -597,6 +598,7 @@ void grpc_call_unref(grpc_call* c) { grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) { GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); GPR_ASSERT(!reserved); + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; cancel_with_error(call, GRPC_ERROR_CANCELLED); return GRPC_CALL_OK; @@ -646,6 +648,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c, grpc_status_code status, const char* description, void* reserved) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE( "grpc_call_cancel_with_status(" @@ -1894,7 +1897,6 @@ done_with_error: grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops, size_t nops, void* tag, void* reserved) { - grpc_core::ExecCtx exec_ctx; grpc_call_error err; GRPC_API_TRACE( @@ -1905,6 +1907,8 @@ grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops, if (reserved != nullptr) { err = GRPC_CALL_ERROR; } else { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; err = call_start_batch(call, ops, nops, tag, 0); } diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 661022ec5f1..426a4a3f24e 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -868,7 +868,7 @@ static void cq_end_op_for_callback( GRPC_ERROR_UNREF(error); auto* functor = static_cast(tag); - (*functor->functor_run)(functor, is_success); + grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, is_success); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, @@ -1352,7 +1352,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - (*callback->functor_run)(callback, true); + grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true); } static void cq_shutdown_callback(grpc_completion_queue* cq) { @@ -1385,6 +1385,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0); + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); cq->vtable->shutdown(cq); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index cdfd3336437..c20796f5acf 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -1302,6 +1302,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server, listener* l; shutdown_tag* sdt; channel_broadcaster broadcaster; + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3, @@ -1369,6 +1370,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server, void grpc_server_cancel_all_calls(grpc_server* server) { channel_broadcaster broadcaster; + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server)); @@ -1384,6 +1386,7 @@ void grpc_server_cancel_all_calls(grpc_server* server) { void grpc_server_destroy(grpc_server* server) { listener* l; + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server)); @@ -1469,6 +1472,7 @@ grpc_call_error grpc_server_request_call( grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag) { grpc_call_error error; + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; requested_call* rc = static_cast(gpr_malloc(sizeof(*rc))); GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); @@ -1515,11 +1519,11 @@ grpc_call_error grpc_server_request_registered_call( grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload, grpc_completion_queue* cq_bound_to_call, grpc_completion_queue* cq_for_notification, void* tag) { - grpc_call_error error; + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; + GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); requested_call* rc = static_cast(gpr_malloc(sizeof(*rc))); registered_method* rm = static_cast(rmp); - GRPC_STATS_INC_SERVER_REQUESTED_CALLS(); GRPC_API_TRACE( "grpc_server_request_registered_call(" "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, " @@ -1537,19 +1541,17 @@ grpc_call_error grpc_server_request_registered_call( } if (cq_idx == server->cq_count) { gpr_free(rc); - error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; - goto done; + return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; } if ((optional_payload == nullptr) != (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) { gpr_free(rc); - error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; - goto done; + return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; } + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { gpr_free(rc); - error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; - goto done; + return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; } rc->cq_idx = cq_idx; rc->type = REGISTERED_CALL; @@ -1561,10 +1563,7 @@ grpc_call_error grpc_server_request_registered_call( rc->data.registered.deadline = deadline; rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; - error = queue_call_request(server, cq_idx, rc); -done: - - return error; + return queue_call_request(server, cq_idx, rc); } static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc, diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 148f0b9bc94..6bfe26f04c4 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -52,6 +52,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { return true; } void Set(::grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm"); cq_ = cq->cq(); @@ -72,6 +73,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { &on_alarm_); } void Set(gpr_timespec deadline, std::function f) { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; // Don't use any CQ at all. Instead just use the timer to fire the function callback_ = std::move(f); @@ -87,6 +89,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag { &on_alarm_); } void Cancel() { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_timer_cancel(&timer_); } diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 13741ce7aa5..12aa52ef704 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -59,7 +59,15 @@ namespace { #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX // How many callback requests of each method should we pre-register at start -#define DEFAULT_CALLBACK_REQS_PER_METHOD 32 +#define DEFAULT_CALLBACK_REQS_PER_METHOD 512 + +// What is the (soft) limit for outstanding requests in the server +#define MAXIMUM_CALLBACK_REQS_OUTSTANDING 30000 + +// If the number of unmatched requests for a method drops below this amount, +// try to allocate extra unless it pushes the total number of callbacks above +// the soft maximum +#define SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD 128 class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: @@ -343,9 +351,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { class Server::CallbackRequest final : public internal::CompletionQueueTag { public: - CallbackRequest(Server* server, internal::RpcServiceMethod* method, - void* method_tag) + CallbackRequest(Server* server, Server::MethodReqList* list, + internal::RpcServiceMethod* method, void* method_tag) : server_(server), + req_list_(list), method_(method), method_tag_(method_tag), has_request_payload_( @@ -353,12 +362,22 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { method->method_type() == internal::RpcMethod::SERVER_STREAMING), cq_(server->CallbackCQ()), tag_(this) { + server_->callback_reqs_outstanding_++; Setup(); } - ~CallbackRequest() { Clear(); } + ~CallbackRequest() { + Clear(); - void Request() { + // The counter of outstanding requests must be decremented + // under a lock in case it causes the server shutdown. + std::lock_guard l(server_->callback_reqs_mu_); + if (--server_->callback_reqs_outstanding_ == 0) { + server_->callback_reqs_done_cv_.notify_one(); + } + } + + bool Request() { if (method_tag_) { if (GRPC_CALL_OK != grpc_server_request_registered_call( @@ -366,7 +385,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(), cq_->cq(), static_cast(&tag_))) { - return; + return false; } } else { if (!call_details_) { @@ -376,9 +395,10 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { if (grpc_server_request_call(server_->c_server(), &call_, call_details_, &request_metadata_, cq_->cq(), cq_->cq(), static_cast(&tag_)) != GRPC_CALL_OK) { - return; + return false; } } + return true; } bool FinalizeResult(void** tag, bool* status) override { return false; } @@ -409,10 +429,48 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); GPR_ASSERT(ignored == req_); - if (!ok) { - // The call has been shutdown - req_->Clear(); - return; + bool spawn_new = false; + { + std::unique_lock l(req_->req_list_->reqs_mu); + req_->req_list_->reqs_list.erase(req_->req_list_iterator_); + req_->req_list_->reqs_list_sz--; + if (!ok) { + // The call has been shutdown. + // Delete its contents to free up the request. + // First release the lock in case the deletion of the request + // completes the full server shutdown and allows the destructor + // of the req_list to proceed. + l.unlock(); + delete req_; + return; + } + + // If this was the last request in the list or it is below the soft + // minimum and there are spare requests available, set up a new one, but + // do it outside the lock since the Request could otherwise deadlock + if (req_->req_list_->reqs_list_sz == 0 || + (req_->req_list_->reqs_list_sz < + SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD && + req_->server_->callback_reqs_outstanding_ < + MAXIMUM_CALLBACK_REQS_OUTSTANDING)) { + spawn_new = true; + } + } + if (spawn_new) { + auto* new_req = new CallbackRequest(req_->server_, req_->req_list_, + req_->method_, req_->method_tag_); + if (!new_req->Request()) { + // The server must have just decided to shutdown. Erase + // from the list under lock but release the lock before + // deleting the new_req (in case that request was what + // would allow the destruction of the req_list) + { + std::lock_guard l(new_req->req_list_->reqs_mu); + new_req->req_list_->reqs_list.erase(new_req->req_list_iterator_); + new_req->req_list_->reqs_list_sz--; + } + delete new_req; + } } // Bind the call, deadline, and metadata from what we got @@ -462,17 +520,30 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, [this] { - req_->Reset(); - req_->Request(); + // Recycle this request if there aren't too many outstanding. + // Note that we don't have to worry about a case where there + // are no requests waiting to match for this method since that + // is already taken care of when binding a request to a call. + // TODO(vjpai): Also don't recycle this request if the dynamic + // load no longer justifies it. Consider measuring + // dynamic load and setting a target accordingly. + if (req_->server_->callback_reqs_outstanding_ < + MAXIMUM_CALLBACK_REQS_OUTSTANDING) { + req_->Clear(); + req_->Setup(); + } else { + // We can free up this request because there are too many + delete req_; + return; + } + if (!req_->Request()) { + // The server must have just decided to shutdown. + delete req_; + } })); } }; - void Reset() { - Clear(); - Setup(); - } - void Clear() { if (call_details_) { delete call_details_; @@ -492,9 +563,15 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { request_payload_ = nullptr; request_ = nullptr; request_status_ = Status(); + std::lock_guard l(req_list_->reqs_mu); + req_list_->reqs_list.push_front(this); + req_list_->reqs_list_sz++; + req_list_iterator_ = req_list_->reqs_list.begin(); } Server* const server_; + Server::MethodReqList* req_list_; + Server::MethodReqList::iterator req_list_iterator_; internal::RpcServiceMethod* const method_; void* const method_tag_; const bool has_request_payload_; @@ -715,6 +792,13 @@ Server::~Server() { } grpc_server_destroy(server_); + for (auto* method_list : callback_reqs_) { + // The entries of the method_list should have already been emptied + // during Shutdown as each request is failed by Shutdown. Check that + // this actually happened. + GPR_ASSERT(method_list->reqs_list.empty()); + delete method_list; + } } void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { @@ -794,10 +878,12 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { } } else { // a callback method. Register at least some callback requests + callback_reqs_.push_back(new Server::MethodReqList); + auto* method_req_list = callback_reqs_.back(); // TODO(vjpai): Register these dynamically based on need for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { - auto* req = new CallbackRequest(this, method, method_registration_tag); - callback_reqs_.emplace_back(req); + new CallbackRequest(this, method_req_list, method, + method_registration_tag); } // Enqueue it so that it will be Request'ed later once // all request matchers are created at core server startup @@ -889,8 +975,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { (*it)->Start(); } - for (auto& cbreq : callback_reqs_) { - cbreq->Request(); + for (auto* cbmethods : callback_reqs_) { + for (auto* cbreq : cbmethods->reqs_list) { + GPR_ASSERT(cbreq->Request()); + } } if (default_health_check_service_impl != nullptr) { @@ -900,49 +988,69 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { void Server::ShutdownInternal(gpr_timespec deadline) { std::unique_lock lock(mu_); - if (!shutdown_) { - shutdown_ = true; + if (shutdown_) { + return; + } - /// The completion queue to use for server shutdown completion notification - CompletionQueue shutdown_cq; - ShutdownTag shutdown_tag; // Dummy shutdown tag - grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); + shutdown_ = true; - shutdown_cq.Shutdown(); + /// The completion queue to use for server shutdown completion notification + CompletionQueue shutdown_cq; + ShutdownTag shutdown_tag; // Dummy shutdown tag + grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); - void* tag; - bool ok; - CompletionQueue::NextStatus status = - shutdown_cq.AsyncNext(&tag, &ok, deadline); + shutdown_cq.Shutdown(); - // If this timed out, it means we are done with the grace period for a clean - // shutdown. We should force a shutdown now by cancelling all inflight calls - if (status == CompletionQueue::NextStatus::TIMEOUT) { - grpc_server_cancel_all_calls(server_); - } - // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has - // successfully shutdown + void* tag; + bool ok; + CompletionQueue::NextStatus status = + shutdown_cq.AsyncNext(&tag, &ok, deadline); - // Shutdown all ThreadManagers. This will try to gracefully stop all the - // threads in the ThreadManagers (once they process any inflight requests) - for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { - (*it)->Shutdown(); // ThreadManager's Shutdown() - } + // If this timed out, it means we are done with the grace period for a clean + // shutdown. We should force a shutdown now by cancelling all inflight calls + if (status == CompletionQueue::NextStatus::TIMEOUT) { + grpc_server_cancel_all_calls(server_); + } + // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has + // successfully shutdown - // Wait for threads in all ThreadManagers to terminate - for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { - (*it)->Wait(); - } + // Shutdown all ThreadManagers. This will try to gracefully stop all the + // threads in the ThreadManagers (once they process any inflight requests) + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Shutdown(); // ThreadManager's Shutdown() + } - // Drain the shutdown queue (if the previous call to AsyncNext() timed out - // and we didn't remove the tag from the queue yet) - while (shutdown_cq.Next(&tag, &ok)) { - // Nothing to be done here. Just ignore ok and tag values - } + // Wait for threads in all ThreadManagers to terminate + for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { + (*it)->Wait(); + } - shutdown_notified_ = true; - shutdown_cv_.notify_all(); + // Wait for all outstanding callback requests to complete + // (whether waiting for a match or already active). + // We know that no new requests will be created after this point + // because they are only created at server startup time or when + // we have a successful match on a request. During the shutdown phase, + // requests that have not yet matched will be failed rather than + // allowed to succeed, which will cause the server to delete the + // request and decrement the count. Possibly a request will match before + // the shutdown but then find that shutdown has already started by the + // time it tries to register a new request. In that case, the registration + // will report a failure, indicating a shutdown and again we won't end + // up incrementing the counter. + { + std::unique_lock cblock(callback_reqs_mu_); + callback_reqs_done_cv_.wait( + cblock, [this] { return callback_reqs_outstanding_ == 0; }); + } + + // Drain the shutdown queue (if the previous call to AsyncNext() timed out + // and we didn't remove the tag from the queue yet) + while (shutdown_cq.Next(&tag, &ok)) { + // Nothing to be done here. Just ignore ok and tag values } + + shutdown_notified_ = true; + shutdown_cv_.notify_all(); } void Server::Wait() { diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index a157d75edab..7c3630eaf18 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -389,46 +389,49 @@ static void test_callback(void) { attr.cq_shutdown_cb = &shutdown_cb; for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) { - grpc_core::ExecCtx exec_ctx; // reset exec_ctx - attr.cq_polling_type = polling_types[pidx]; - cc = grpc_completion_queue_create( - grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); - + int sumtags = 0; int counter = 0; - class TagCallback : public grpc_experimental_completion_queue_functor { - public: - TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) { - functor_run = &TagCallback::Run; - } - ~TagCallback() {} - static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { - GPR_ASSERT(static_cast(ok)); - auto* callback = static_cast(cb); - *callback->counter_ += callback->tag_; - grpc_core::Delete(callback); + { + // reset exec_ctx types + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx; + attr.cq_polling_type = polling_types[pidx]; + cc = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); + + class TagCallback : public grpc_experimental_completion_queue_functor { + public: + TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) { + functor_run = &TagCallback::Run; + } + ~TagCallback() {} + static void Run(grpc_experimental_completion_queue_functor* cb, + int ok) { + GPR_ASSERT(static_cast(ok)); + auto* callback = static_cast(cb); + *callback->counter_ += callback->tag_; + grpc_core::Delete(callback); + }; + + private: + int* counter_; + int tag_; }; - private: - int* counter_; - int tag_; - }; + for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { + tags[i] = static_cast(grpc_core::New(&counter, i)); + sumtags += i; + } - int sumtags = 0; - for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - tags[i] = static_cast(grpc_core::New(&counter, i)); - sumtags += i; - } + for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { + GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); + grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, + nullptr, &completions[i]); + } - for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); - grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, - nullptr, &completions[i]); + shutdown_and_destroy(cc); } - GPR_ASSERT(sumtags == counter); - - shutdown_and_destroy(cc); - GPR_ASSERT(got_shutdown); got_shutdown = false; } diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 650152ecc0d..dcfaa684773 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -101,8 +101,6 @@ class DummyEndpoint : public grpc_endpoint { GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); } - static grpc_workqueue* get_workqueue(grpc_endpoint* ep) { return nullptr; } - static void add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {} static void add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset) {