Dynamic callback requesting, graceful server shutdown, and separate ExecCtx for callbacks

reviewable/pr17686/r3
Vijay Pai 6 years ago
parent afc5b0b5d4
commit fab05d336c
  1. 4
      include/grpc/impl/codegen/grpc_types.h
  2. 30
      include/grpcpp/server.h
  3. 1
      src/core/lib/iomgr/exec_ctx.cc
  4. 57
      src/core/lib/iomgr/exec_ctx.h
  5. 7
      src/core/lib/iomgr/executor.cc
  6. 7
      src/core/lib/iomgr/timer_manager.cc
  7. 6
      src/core/lib/surface/call.cc
  8. 5
      src/core/lib/surface/completion_queue.cc
  9. 23
      src/core/lib/surface/server.cc
  10. 3
      src/cpp/common/alarm.cc
  11. 220
      src/cpp/server/server_cc.cc
  12. 69
      test/core/surface/completion_queue_test.cc
  13. 2
      test/cpp/microbenchmarks/bm_chttp2_transport.cc

@ -693,6 +693,10 @@ typedef struct grpc_experimental_completion_queue_functor {
pointer to this functor and a boolean that indicates whether the
operation succeeded (non-zero) or failed (zero) */
void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int);
/** The following fields are not API. They are meant for internal use. */
int internal_success;
struct grpc_experimental_completion_queue_functor* internal_next;
} grpc_experimental_completion_queue_functor;
/* The upgrade to version 2 is currently experimental. */

@ -248,8 +248,22 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
/// the \a sync_server_cqs)
std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
/// Outstanding callback requests
std::vector<std::unique_ptr<CallbackRequest>> callback_reqs_;
// Outstanding callback requests. The vector is indexed by method with a
// list per method. Each element should store its own iterator
// in the list and should erase it when the request is actually bound to
// an RPC. Synchronize this list with its own mu_ (not the server mu_) since
// these must be active at Shutdown when the server mu_ is locked
// TODO(vjpai): Merge with the core request matcher to avoid duplicate work
struct MethodReqList {
std::mutex reqs_mu;
// Maintain our own list size count since list::size is still linear
// for some libraries (supposed to be constant since C++11)
// TODO(vjpai): Remove reqs_list_sz and use list::size when possible
size_t reqs_list_sz{0};
std::list<CallbackRequest*> reqs_list;
using iterator = decltype(reqs_list)::iterator;
};
std::vector<MethodReqList*> callback_reqs_;
// Server status
std::mutex mu_;
@ -259,6 +273,18 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
std::condition_variable shutdown_cv_;
// It is ok (but not required) to nest callback_reqs_mu_ under mu_ .
// Incrementing callback_reqs_outstanding_ is ok without a lock
// but it should only be decremented under the lock in case it is the
// last request and enables the server shutdown. The increment is
// performance-critical since it happens during periods of increasing
// load; the decrement happens only when memory is maxed out, during server
// shutdown, or (possibly in a future version) during decreasing load, so
// it is less performance-critical.
std::mutex callback_reqs_mu_;
std::condition_variable callback_reqs_done_cv_;
std::atomic_int callback_reqs_outstanding_{0};
std::shared_ptr<GlobalCallbacks> global_callbacks_;
std::vector<grpc::string> services_;

@ -115,6 +115,7 @@ grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler;
namespace grpc_core {
GPR_TLS_CLASS_DEF(ExecCtx::exec_ctx_);
GPR_TLS_CLASS_DEF(ApplicationCallbackExecCtx::callback_exec_ctx_);
// WARNING: for testing purposes only!
void ExecCtx::TestOnlyGlobalInit(gpr_timespec new_val) {

@ -21,12 +21,14 @@
#include <grpc/support/port_platform.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/atm.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/closure.h"
typedef int64_t grpc_millis;
@ -34,9 +36,8 @@ typedef int64_t grpc_millis;
#define GRPC_MILLIS_INF_FUTURE INT64_MAX
#define GRPC_MILLIS_INF_PAST INT64_MIN
/** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */
typedef struct grpc_workqueue grpc_workqueue;
/** A combiner represents a list of work to be executed later.
Forward declared here to avoid a circular dependency with combiner.h. */
typedef struct grpc_combiner grpc_combiner;
/* This exec_ctx is ready to return: either pre-populated, or cached as soon as
@ -226,6 +227,56 @@ class ExecCtx {
GPR_TLS_CLASS_DECL(exec_ctx_);
ExecCtx* last_exec_ctx_ = Get();
};
class ApplicationCallbackExecCtx {
public:
ApplicationCallbackExecCtx() {
if (reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) == nullptr) {
grpc_core::Fork::IncExecCtxCount();
gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(this));
}
}
~ApplicationCallbackExecCtx() {
if (reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) == this) {
while (head_ != nullptr) {
auto* f = head_;
head_ = f->internal_next;
if (f->internal_next == nullptr) {
tail_ = nullptr;
}
(*f->functor_run)(f, f->internal_success);
}
gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(nullptr));
grpc_core::Fork::DecExecCtxCount();
} else {
GPR_DEBUG_ASSERT(head_ == nullptr);
GPR_DEBUG_ASSERT(tail_ == nullptr);
}
}
static void Enqueue(grpc_experimental_completion_queue_functor* functor,
int is_success) {
functor->internal_success = is_success;
functor->internal_next = nullptr;
auto* ctx = reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_));
if (ctx->head_ == nullptr) {
ctx->head_ = functor;
}
if (ctx->tail_ != nullptr) {
ctx->tail_->internal_next = functor;
}
ctx->tail_ = functor;
}
private:
grpc_experimental_completion_queue_functor* head_{nullptr};
grpc_experimental_completion_queue_functor* tail_{nullptr};
GPR_TLS_CLASS_DECL(callback_exec_ctx_);
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */

@ -111,6 +111,13 @@ size_t Executor::RunClosures(const char* executor_name,
grpc_closure_list list) {
size_t n = 0;
// In the executor, the ExecCtx for the thread is declared
// in the executor thread itself, but this is the point where we
// could start seeing application-level callbacks. No need to
// create a new ExecCtx, though, since there already is one and it is
// flushed (but not destructed) in this function itself
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_closure* c = list.head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;

@ -105,6 +105,13 @@ void grpc_timer_manager_tick() {
}
static void run_some_timers() {
// In the case of timers, the ExecCtx for the thread is declared
// in the timer thread itself, but this is the point where we
// could start seeing application-level callbacks. No need to
// create a new ExecCtx, though, since there already is one and it is
// flushed (but not destructed) in this function itself
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
// if there's something to execute...
gpr_mu_lock(&g_mu);
// remove a waiter from the pool, and start another thread if necessary

@ -556,6 +556,7 @@ void grpc_call_unref(grpc_call* c) {
GPR_TIMER_SCOPE("grpc_call_unref", 0);
child_call* cc = c->child;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
@ -597,6 +598,7 @@ void grpc_call_unref(grpc_call* c) {
grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
GPR_ASSERT(!reserved);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
cancel_with_error(call, GRPC_ERROR_CANCELLED);
return GRPC_CALL_OK;
@ -646,6 +648,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
grpc_status_code status,
const char* description,
void* reserved) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE(
"grpc_call_cancel_with_status("
@ -1894,7 +1897,6 @@ done_with_error:
grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* tag, void* reserved) {
grpc_core::ExecCtx exec_ctx;
grpc_call_error err;
GRPC_API_TRACE(
@ -1905,6 +1907,8 @@ grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
if (reserved != nullptr) {
err = GRPC_CALL_ERROR;
} else {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
err = call_start_batch(call, ops, nops, tag, 0);
}

@ -868,7 +868,7 @@ static void cq_end_op_for_callback(
GRPC_ERROR_UNREF(error);
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
(*functor->functor_run)(functor, is_success);
grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, is_success);
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
@ -1352,7 +1352,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
(*callback->functor_run)(callback, true);
grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
}
static void cq_shutdown_callback(grpc_completion_queue* cq) {
@ -1385,6 +1385,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
cq->vtable->shutdown(cq);

@ -1302,6 +1302,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
listener* l;
shutdown_tag* sdt;
channel_broadcaster broadcaster;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
@ -1369,6 +1370,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
void grpc_server_cancel_all_calls(grpc_server* server) {
channel_broadcaster broadcaster;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
@ -1384,6 +1386,7 @@ void grpc_server_cancel_all_calls(grpc_server* server) {
void grpc_server_destroy(grpc_server* server) {
listener* l;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
@ -1469,6 +1472,7 @@ grpc_call_error grpc_server_request_call(
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
@ -1515,11 +1519,11 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
registered_method* rm = static_cast<registered_method*>(rmp);
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
GRPC_API_TRACE(
"grpc_server_request_registered_call("
"server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
@ -1537,19 +1541,17 @@ grpc_call_error grpc_server_request_registered_call(
}
if (cq_idx == server->cq_count) {
gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
if ((optional_payload == nullptr) !=
(rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
gpr_free(rc);
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
}
if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
gpr_free(rc);
error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
goto done;
return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
}
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
@ -1561,10 +1563,7 @@ grpc_call_error grpc_server_request_registered_call(
rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
error = queue_call_request(server, cq_idx, rc);
done:
return error;
return queue_call_request(server, cq_idx, rc);
}
static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,

@ -52,6 +52,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
return true;
}
void Set(::grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
cq_ = cq->cq();
@ -72,6 +73,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
&on_alarm_);
}
void Set(gpr_timespec deadline, std::function<void(bool)> f) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
// Don't use any CQ at all. Instead just use the timer to fire the function
callback_ = std::move(f);
@ -87,6 +89,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
&on_alarm_);
}
void Cancel() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_timer_cancel(&timer_);
}

@ -59,7 +59,15 @@ namespace {
#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
// How many callback requests of each method should we pre-register at start
#define DEFAULT_CALLBACK_REQS_PER_METHOD 32
#define DEFAULT_CALLBACK_REQS_PER_METHOD 512
// What is the (soft) limit for outstanding requests in the server
#define MAXIMUM_CALLBACK_REQS_OUTSTANDING 30000
// If the number of unmatched requests for a method drops below this amount,
// try to allocate extra unless it pushes the total number of callbacks above
// the soft maximum
#define SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD 128
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
@ -343,9 +351,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
class Server::CallbackRequest final : public internal::CompletionQueueTag {
public:
CallbackRequest(Server* server, internal::RpcServiceMethod* method,
void* method_tag)
CallbackRequest(Server* server, Server::MethodReqList* list,
internal::RpcServiceMethod* method, void* method_tag)
: server_(server),
req_list_(list),
method_(method),
method_tag_(method_tag),
has_request_payload_(
@ -353,12 +362,22 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
method->method_type() == internal::RpcMethod::SERVER_STREAMING),
cq_(server->CallbackCQ()),
tag_(this) {
server_->callback_reqs_outstanding_++;
Setup();
}
~CallbackRequest() { Clear(); }
~CallbackRequest() {
Clear();
void Request() {
// The counter of outstanding requests must be decremented
// under a lock in case it causes the server shutdown.
std::lock_guard<std::mutex> l(server_->callback_reqs_mu_);
if (--server_->callback_reqs_outstanding_ == 0) {
server_->callback_reqs_done_cv_.notify_one();
}
}
bool Request() {
if (method_tag_) {
if (GRPC_CALL_OK !=
grpc_server_request_registered_call(
@ -366,7 +385,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
&request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(),
cq_->cq(), static_cast<void*>(&tag_))) {
return;
return false;
}
} else {
if (!call_details_) {
@ -376,9 +395,10 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
if (grpc_server_request_call(server_->c_server(), &call_, call_details_,
&request_metadata_, cq_->cq(), cq_->cq(),
static_cast<void*>(&tag_)) != GRPC_CALL_OK) {
return;
return false;
}
}
return true;
}
bool FinalizeResult(void** tag, bool* status) override { return false; }
@ -409,10 +429,48 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
GPR_ASSERT(ignored == req_);
if (!ok) {
// The call has been shutdown
req_->Clear();
return;
bool spawn_new = false;
{
std::unique_lock<std::mutex> l(req_->req_list_->reqs_mu);
req_->req_list_->reqs_list.erase(req_->req_list_iterator_);
req_->req_list_->reqs_list_sz--;
if (!ok) {
// The call has been shutdown.
// Delete its contents to free up the request.
// First release the lock in case the deletion of the request
// completes the full server shutdown and allows the destructor
// of the req_list to proceed.
l.unlock();
delete req_;
return;
}
// If this was the last request in the list or it is below the soft
// minimum and there are spare requests available, set up a new one, but
// do it outside the lock since the Request could otherwise deadlock
if (req_->req_list_->reqs_list_sz == 0 ||
(req_->req_list_->reqs_list_sz <
SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD &&
req_->server_->callback_reqs_outstanding_ <
MAXIMUM_CALLBACK_REQS_OUTSTANDING)) {
spawn_new = true;
}
}
if (spawn_new) {
auto* new_req = new CallbackRequest(req_->server_, req_->req_list_,
req_->method_, req_->method_tag_);
if (!new_req->Request()) {
// The server must have just decided to shutdown. Erase
// from the list under lock but release the lock before
// deleting the new_req (in case that request was what
// would allow the destruction of the req_list)
{
std::lock_guard<std::mutex> l(new_req->req_list_->reqs_mu);
new_req->req_list_->reqs_list.erase(new_req->req_list_iterator_);
new_req->req_list_->reqs_list_sz--;
}
delete new_req;
}
}
// Bind the call, deadline, and metadata from what we got
@ -462,17 +520,30 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
[this] {
req_->Reset();
req_->Request();
// Recycle this request if there aren't too many outstanding.
// Note that we don't have to worry about a case where there
// are no requests waiting to match for this method since that
// is already taken care of when binding a request to a call.
// TODO(vjpai): Also don't recycle this request if the dynamic
// load no longer justifies it. Consider measuring
// dynamic load and setting a target accordingly.
if (req_->server_->callback_reqs_outstanding_ <
MAXIMUM_CALLBACK_REQS_OUTSTANDING) {
req_->Clear();
req_->Setup();
} else {
// We can free up this request because there are too many
delete req_;
return;
}
if (!req_->Request()) {
// The server must have just decided to shutdown.
delete req_;
}
}));
}
};
void Reset() {
Clear();
Setup();
}
void Clear() {
if (call_details_) {
delete call_details_;
@ -492,9 +563,15 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
request_payload_ = nullptr;
request_ = nullptr;
request_status_ = Status();
std::lock_guard<std::mutex> l(req_list_->reqs_mu);
req_list_->reqs_list.push_front(this);
req_list_->reqs_list_sz++;
req_list_iterator_ = req_list_->reqs_list.begin();
}
Server* const server_;
Server::MethodReqList* req_list_;
Server::MethodReqList::iterator req_list_iterator_;
internal::RpcServiceMethod* const method_;
void* const method_tag_;
const bool has_request_payload_;
@ -715,6 +792,13 @@ Server::~Server() {
}
grpc_server_destroy(server_);
for (auto* method_list : callback_reqs_) {
// The entries of the method_list should have already been emptied
// during Shutdown as each request is failed by Shutdown. Check that
// this actually happened.
GPR_ASSERT(method_list->reqs_list.empty());
delete method_list;
}
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@ -794,10 +878,12 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
}
} else {
// a callback method. Register at least some callback requests
callback_reqs_.push_back(new Server::MethodReqList);
auto* method_req_list = callback_reqs_.back();
// TODO(vjpai): Register these dynamically based on need
for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
auto* req = new CallbackRequest(this, method, method_registration_tag);
callback_reqs_.emplace_back(req);
new CallbackRequest(this, method_req_list, method,
method_registration_tag);
}
// Enqueue it so that it will be Request'ed later once
// all request matchers are created at core server startup
@ -889,8 +975,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
(*it)->Start();
}
for (auto& cbreq : callback_reqs_) {
cbreq->Request();
for (auto* cbmethods : callback_reqs_) {
for (auto* cbreq : cbmethods->reqs_list) {
GPR_ASSERT(cbreq->Request());
}
}
if (default_health_check_service_impl != nullptr) {
@ -900,49 +988,69 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
void Server::ShutdownInternal(gpr_timespec deadline) {
std::unique_lock<std::mutex> lock(mu_);
if (!shutdown_) {
shutdown_ = true;
if (shutdown_) {
return;
}
/// The completion queue to use for server shutdown completion notification
CompletionQueue shutdown_cq;
ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
shutdown_ = true;
shutdown_cq.Shutdown();
/// The completion queue to use for server shutdown completion notification
CompletionQueue shutdown_cq;
ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
void* tag;
bool ok;
CompletionQueue::NextStatus status =
shutdown_cq.AsyncNext(&tag, &ok, deadline);
shutdown_cq.Shutdown();
// If this timed out, it means we are done with the grace period for a clean
// shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == CompletionQueue::NextStatus::TIMEOUT) {
grpc_server_cancel_all_calls(server_);
}
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
void* tag;
bool ok;
CompletionQueue::NextStatus status =
shutdown_cq.AsyncNext(&tag, &ok, deadline);
// Shutdown all ThreadManagers. This will try to gracefully stop all the
// threads in the ThreadManagers (once they process any inflight requests)
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Shutdown(); // ThreadManager's Shutdown()
}
// If this timed out, it means we are done with the grace period for a clean
// shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == CompletionQueue::NextStatus::TIMEOUT) {
grpc_server_cancel_all_calls(server_);
}
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
// Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
}
// Shutdown all ThreadManagers. This will try to gracefully stop all the
// threads in the ThreadManagers (once they process any inflight requests)
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Shutdown(); // ThreadManager's Shutdown()
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
while (shutdown_cq.Next(&tag, &ok)) {
// Nothing to be done here. Just ignore ok and tag values
}
// Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
}
shutdown_notified_ = true;
shutdown_cv_.notify_all();
// Wait for all outstanding callback requests to complete
// (whether waiting for a match or already active).
// We know that no new requests will be created after this point
// because they are only created at server startup time or when
// we have a successful match on a request. During the shutdown phase,
// requests that have not yet matched will be failed rather than
// allowed to succeed, which will cause the server to delete the
// request and decrement the count. Possibly a request will match before
// the shutdown but then find that shutdown has already started by the
// time it tries to register a new request. In that case, the registration
// will report a failure, indicating a shutdown and again we won't end
// up incrementing the counter.
{
std::unique_lock<std::mutex> cblock(callback_reqs_mu_);
callback_reqs_done_cv_.wait(
cblock, [this] { return callback_reqs_outstanding_ == 0; });
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
while (shutdown_cq.Next(&tag, &ok)) {
// Nothing to be done here. Just ignore ok and tag values
}
shutdown_notified_ = true;
shutdown_cv_.notify_all();
}
void Server::Wait() {

@ -389,46 +389,49 @@ static void test_callback(void) {
attr.cq_shutdown_cb = &shutdown_cb;
for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
grpc_core::ExecCtx exec_ctx; // reset exec_ctx
attr.cq_polling_type = polling_types[pidx];
cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
int sumtags = 0;
int counter = 0;
class TagCallback : public grpc_experimental_completion_queue_functor {
public:
TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
functor_run = &TagCallback::Run;
}
~TagCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
GPR_ASSERT(static_cast<bool>(ok));
auto* callback = static_cast<TagCallback*>(cb);
*callback->counter_ += callback->tag_;
grpc_core::Delete(callback);
{
// reset exec_ctx types
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
attr.cq_polling_type = polling_types[pidx];
cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
class TagCallback : public grpc_experimental_completion_queue_functor {
public:
TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
functor_run = &TagCallback::Run;
}
~TagCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb,
int ok) {
GPR_ASSERT(static_cast<bool>(ok));
auto* callback = static_cast<TagCallback*>(cb);
*callback->counter_ += callback->tag_;
grpc_core::Delete(callback);
};
private:
int* counter_;
int tag_;
};
private:
int* counter_;
int tag_;
};
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i));
sumtags += i;
}
int sumtags = 0;
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i));
sumtags += i;
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
nullptr, &completions[i]);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
nullptr, &completions[i]);
shutdown_and_destroy(cc);
}
GPR_ASSERT(sumtags == counter);
shutdown_and_destroy(cc);
GPR_ASSERT(got_shutdown);
got_shutdown = false;
}

@ -101,8 +101,6 @@ class DummyEndpoint : public grpc_endpoint {
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
}
static grpc_workqueue* get_workqueue(grpc_endpoint* ep) { return nullptr; }
static void add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
static void add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset) {

Loading…
Cancel
Save