From d4680eb8eb3735b486ac211e79d35e007e45cd57 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 2 May 2022 20:59:44 -0700 Subject: [PATCH] Revert "health check service: rewrite using callback API (#29457)" (#29559) This reverts commit 2b00c7d2ad8ed228374fcf67e205a4742776f8de. --- .../health/default_health_check_service.cc | 386 ++++++++++++------ .../health/default_health_check_service.h | 233 +++++++++-- src/cpp/server/server_cc.cc | 29 +- 3 files changed, 483 insertions(+), 165 deletions(-) diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 608d98d5dd9..d0114f0f9fa 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -44,7 +44,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() { void DefaultHealthCheckService::SetServingStatus( const std::string& service_name, bool serving) { - grpc::internal::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { // Set to NOT_SERVING in case service_name is not in the map. serving = false; @@ -54,8 +54,10 @@ void DefaultHealthCheckService::SetServingStatus( void DefaultHealthCheckService::SetServingStatus(bool serving) { const ServingStatus status = serving ? SERVING : NOT_SERVING; - grpc::internal::MutexLock lock(&mu_); - if (shutdown_) return; + grpc_core::MutexLock lock(&mu_); + if (shutdown_) { + return; + } for (auto& p : services_map_) { ServiceData& service_data = p.second; service_data.SetServingStatus(status); @@ -63,8 +65,10 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) { } void DefaultHealthCheckService::Shutdown() { - grpc::internal::MutexLock lock(&mu_); - if (shutdown_) return; + grpc_core::MutexLock lock(&mu_); + if (shutdown_) { + return; + } shutdown_ = true; for (auto& p : services_map_) { ServiceData& service_data = p.second; @@ -75,37 +79,43 @@ void DefaultHealthCheckService::Shutdown() { DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::GetServingStatus( const std::string& service_name) const { - grpc::internal::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); - if (it == services_map_.end()) return NOT_FOUND; + if (it == services_map_.end()) { + return NOT_FOUND; + } const ServiceData& service_data = it->second; return service_data.GetServingStatus(); } -void DefaultHealthCheckService::RegisterWatch( +void DefaultHealthCheckService::RegisterCallHandler( const std::string& service_name, - grpc_core::RefCountedPtr watcher) { - grpc::internal::MutexLock lock(&mu_); + std::shared_ptr handler) { + grpc_core::MutexLock lock(&mu_); ServiceData& service_data = services_map_[service_name]; - watcher->SendHealth(service_data.GetServingStatus()); - service_data.AddWatch(std::move(watcher)); + service_data.AddCallHandler(handler /* copies ref */); + HealthCheckServiceImpl::CallHandler* h = handler.get(); + h->SendHealth(std::move(handler), service_data.GetServingStatus()); } -void DefaultHealthCheckService::UnregisterWatch( +void DefaultHealthCheckService::UnregisterCallHandler( const std::string& service_name, - HealthCheckServiceImpl::WatchReactor* watcher) { - grpc::internal::MutexLock lock(&mu_); + const std::shared_ptr& handler) { + grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) return; ServiceData& service_data = it->second; - service_data.RemoveWatch(watcher); - if (service_data.Unused()) services_map_.erase(it); + service_data.RemoveCallHandler(handler); + if (service_data.Unused()) { + services_map_.erase(it); + } } DefaultHealthCheckService::HealthCheckServiceImpl* -DefaultHealthCheckService::GetHealthCheckService() { +DefaultHealthCheckService::GetHealthCheckService( + std::unique_ptr cq) { GPR_ASSERT(impl_ == nullptr); - impl_ = absl::make_unique(this); + impl_ = absl::make_unique(this, std::move(cq)); return impl_.get(); } @@ -116,19 +126,19 @@ DefaultHealthCheckService::GetHealthCheckService() { void DefaultHealthCheckService::ServiceData::SetServingStatus( ServingStatus status) { status_ = status; - for (const auto& p : watchers_) { - p.first->SendHealth(status); + for (auto& call_handler : call_handlers_) { + call_handler->SendHealth(call_handler /* copies ref */, status); } } -void DefaultHealthCheckService::ServiceData::AddWatch( - grpc_core::RefCountedPtr watcher) { - watchers_[watcher.get()] = std::move(watcher); +void DefaultHealthCheckService::ServiceData::AddCallHandler( + std::shared_ptr handler) { + call_handlers_.insert(std::move(handler)); } -void DefaultHealthCheckService::ServiceData::RemoveWatch( - HealthCheckServiceImpl::WatchReactor* watcher) { - watchers_.erase(watcher); +void DefaultHealthCheckService::ServiceData::RemoveCallHandler( + const std::shared_ptr& handler) { + call_handlers_.erase(handler); } // @@ -141,57 +151,53 @@ const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch"; } // namespace DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( - DefaultHealthCheckService* database) - : database_(database) { + DefaultHealthCheckService* database, + std::unique_ptr cq) + : database_(database), cq_(std::move(cq)) { // Add Check() method. AddMethod(new internal::RpcServiceMethod( kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr)); - MarkMethodCallback( - 0, new internal::CallbackUnaryHandler( - [database](CallbackServerContext* context, - const ByteBuffer* request, ByteBuffer* response) { - return HandleCheckRequest(database, context, request, response); - })); // Add Watch() method. AddMethod(new internal::RpcServiceMethod( kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr)); - MarkMethodCallback( - 1, new internal::CallbackServerStreamingHandler( - [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) { - return new WatchReactor(this, request); - })); + // Create serving thread. + thread_ = absl::make_unique("grpc_health_check_service", + Serve, this); } DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { - grpc::internal::MutexLock lock(&mu_); + // We will reach here after the server starts shutting down. shutdown_ = true; - while (num_watches_ > 0) { - shutdown_condition_.Wait(&mu_); + { + grpc_core::MutexLock lock(&cq_shutdown_mu_); + cq_->Shutdown(); } + thread_->Join(); } -ServerUnaryReactor* -DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest( - DefaultHealthCheckService* database, CallbackServerContext* context, - const ByteBuffer* request, ByteBuffer* response) { - auto* reactor = context->DefaultReactor(); - std::string service_name; - if (!DecodeRequest(*request, &service_name)) { - reactor->Finish( - Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); - return reactor; - } - ServingStatus serving_status = database->GetServingStatus(service_name); - if (serving_status == NOT_FOUND) { - reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown")); - return reactor; - } - if (!EncodeResponse(serving_status, response)) { - reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response")); - return reactor; +void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { + // Request the calls we're interested in. + // We do this before starting the serving thread, so that we know it's + // done before server startup is complete. + CheckCallHandler::CreateAndStart(cq_.get(), database_, this); + WatchCallHandler::CreateAndStart(cq_.get(), database_, this); + // Start serving thread. + thread_->Start(); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { + HealthCheckServiceImpl* service = static_cast(arg); + void* tag; + bool ok; + while (true) { + if (!service->cq_->Next(&tag, &ok)) { + // The completion queue is shutting down. + GPR_ASSERT(service->shutdown_); + break; + } + auto* next_step = static_cast(tag); + next_step->Run(ok); } - reactor->Finish(Status::OK); - return reactor; } bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( @@ -242,97 +248,245 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( } // -// DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor +// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler // -DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor( - HealthCheckServiceImpl* service, const ByteBuffer* request) - : service_(service) { +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) { + std::shared_ptr self = + std::make_shared(cq, database, service); + CheckCallHandler* handler = static_cast(self.get()); { - grpc::internal::MutexLock lock(&service_->mu_); - ++service_->num_watches_; + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + if (service->shutdown_) return; + // Request a Check() call. + handler->next_ = + CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_, + &handler->writer_, cq, cq, &handler->next_); } - if (!DecodeRequest(*request, &service_name_)) { - Finish(Status(StatusCode::INTERNAL, "could not parse request")); +} + +DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) + : cq_(cq), database_(database), service_(service), writer_(&ctx_) {} + +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + OnCallReceived(std::shared_ptr self, bool ok) { + if (!ok) { + // The value of ok being false means that the server is shutting down. + return; + } + // Spawn a new handler instance to serve the next new client. Every handler + // instance will deallocate itself when it's done. + CreateAndStart(cq_, database_, service_); + // Process request. + gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, + this); + std::string service_name; + grpc::Status status = Status::OK; + ByteBuffer response; + if (!service_->DecodeRequest(request_, &service_name)) { + status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request"); + } else { + ServingStatus serving_status = database_->GetServingStatus(service_name); + if (serving_status == NOT_FOUND) { + status = Status(StatusCode::NOT_FOUND, "service name unknown"); + } else if (!service_->EncodeResponse(serving_status, &response)) { + status = Status(StatusCode::INTERNAL, "could not encode response"); + } + } + // Send response. + { + grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); + if (!service_->shutdown_) { + next_ = + CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + if (status.ok()) { + writer_.Finish(response, status, &next_); + } else { + writer_.FinishWithError(status, &next_); + } + } + } +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + OnFinishDone(std::shared_ptr self, bool ok) { + if (ok) { + gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", + service_, this); + } + self.reset(); // To appease clang-tidy. +} + +// +// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler +// + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) { + std::shared_ptr self = + std::make_shared(cq, database, service); + WatchCallHandler* handler = static_cast(self.get()); + { + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + if (service->shutdown_) return; + // Request AsyncNotifyWhenDone(). + handler->on_done_notified_ = + CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler, + std::placeholders::_1, std::placeholders::_2), + self /* copies ref */); + handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_); + // Request a Watch() call. + handler->next_ = + CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_, + &handler->stream_, cq, cq, + &handler->next_); + } +} + +DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) + : cq_(cq), database_(database), service_(service), stream_(&ctx_) {} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnCallReceived(std::shared_ptr self, bool ok) { + if (!ok) { + // Server shutting down. + // + // AsyncNotifyWhenDone() needs to be called before the call starts, but the + // tag will not pop out if the call never starts ( + // https://github.com/grpc/grpc/issues/10136). So we need to manually + // release the ownership of the handler in this case. + GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); + return; + } + // Spawn a new handler instance to serve the next new client. Every handler + // instance will deallocate itself when it's done. + CreateAndStart(cq_, database_, service_); + // Parse request. + if (!service_->DecodeRequest(request_, &service_name_)) { + SendFinish(std::move(self), + Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); return; } // Register the call for updates to the service. gpr_log(GPR_DEBUG, - "[HCS %p] Health watch started for service \"%s\" (reactor: %p)", + "[HCS %p] Health watch started for service \"%s\" (handler: %p)", service_, service_name_.c_str(), this); - service_->database_->RegisterWatch(service_name_, Ref()); + database_->RegisterCallHandler(service_name_, std::move(self)); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: - SendHealth(ServingStatus status) { - grpc::internal::MutexLock lock(&mu_); +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendHealth(std::shared_ptr self, ServingStatus status) { + grpc_core::MutexLock lock(&send_mu_); // If there's already a send in flight, cache the new status, and // we'll start a new send for it when the one in flight completes. - if (write_pending_) { + if (send_in_flight_) { pending_status_ = status; return; } // Start a send. - SendHealthLocked(status); + SendHealthLocked(std::move(self), status); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: - SendHealthLocked(ServingStatus status) { - // Check if we're shutting down. - { - grpc::internal::MutexLock lock(&service_->mu_); - if (service_->shutdown_) { - Finish(Status::CANCELLED); - return; - } +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendHealthLocked(std::shared_ptr self, ServingStatus status) { + send_in_flight_ = true; + // Construct response. + ByteBuffer response; + bool success = service_->EncodeResponse(status, &response); + // Grab shutdown lock and send response. + grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); + if (service_->shutdown_) { + SendFinishLocked(std::move(self), Status::CANCELLED); + return; } - // Send response. - bool success = EncodeResponse(status, &response_); if (!success) { - Finish(Status(StatusCode::INTERNAL, "could not encode response")); + SendFinishLocked(std::move(self), + Status(StatusCode::INTERNAL, "could not encode response")); return; } - write_pending_ = true; - StartWrite(&response_); + next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Write(response, &next_); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: - OnWriteDone(bool ok) { - response_.Clear(); +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnSendHealthDone(std::shared_ptr self, bool ok) { if (!ok) { - Finish(Status::CANCELLED); + SendFinish(std::move(self), Status::CANCELLED); return; } - grpc::internal::MutexLock lock(&mu_); - write_pending_ = false; + grpc_core::MutexLock lock(&send_mu_); + send_in_flight_ = false; // If we got a new status since we started the last send, start a // new send for it. if (pending_status_ != NOT_FOUND) { auto status = pending_status_; pending_status_ = NOT_FOUND; - SendHealthLocked(status); + SendHealthLocked(std::move(self), status); } } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: - OnCancel() { - Finish(Status(StatusCode::UNKNOWN, "call cancelled by client")); +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendFinish(std::shared_ptr self, const Status& status) { + if (finish_called_) return; + grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); + if (service_->shutdown_) return; + SendFinishLocked(std::move(self), status); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() { - gpr_log(GPR_DEBUG, - "[HCS %p] Health watch call finished (service_name: \"%s\", " - "watcher: %p).", - service_, service_name_.c_str(), this); - service_->database_->UnregisterWatch(service_name_, this); - { - grpc::internal::MutexLock lock(&service_->mu_); - if (--service_->num_watches_ == 0) { - service_->shutdown_condition_.Signal(); - } +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendFinishLocked(std::shared_ptr self, const Status& status) { + on_finish_done_ = + CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Finish(status, &on_finish_done_); + finish_called_ = true; +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnFinishDone(std::shared_ptr self, bool ok) { + if (ok) { + gpr_log(GPR_DEBUG, + "[HCS %p] Health watch call finished (service_name: \"%s\", " + "handler: %p).", + service_, service_name_.c_str(), this); } - // Free the initial ref from instantiation. - Unref(); + self.reset(); // To appease clang-tidy. +} + +// TODO(roth): This method currently assumes that there will be only one +// thread polling the cq and invoking the corresponding callbacks. If +// that changes, we will need to add synchronization here. +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnDoneNotified(std::shared_ptr self, bool ok) { + GPR_ASSERT(ok); + gpr_log(GPR_DEBUG, + "[HCS %p] Health watch call is notified done (handler: %p, " + "is_cancelled: %d).", + service_, this, static_cast(ctx_.IsCancelled())); + database_->UnregisterCallHandler(service_name_, self); + SendFinish(std::move(self), Status::CANCELLED); } } // namespace grpc diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index c9d48a53c88..0d3f79719fc 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -25,12 +25,14 @@ #include #include #include -#include +#include +#include +#include #include #include -#include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/thd.h" namespace grpc { @@ -43,52 +45,191 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - // Reactor for handling Watch streams. - class WatchReactor : public ServerWriteReactor, - public grpc_core::RefCounted { + // Base class for call handlers. + class CallHandler { public: - WatchReactor(HealthCheckServiceImpl* service, const ByteBuffer* request); + virtual ~CallHandler() = default; + virtual void SendHealth(std::shared_ptr self, + ServingStatus status) = 0; + }; + + HealthCheckServiceImpl(DefaultHealthCheckService* database, + std::unique_ptr cq); + + ~HealthCheckServiceImpl() override; - void SendHealth(ServingStatus status); + void StartServingThread(); - void OnWriteDone(bool ok) override; - void OnCancel() override; - void OnDone() override; + private: + // A tag that can be called with a bool argument. It's tailored for + // CallHandler's use. Before being used, it should be constructed with a + // method of CallHandler and a shared pointer to the handler. The + // shared pointer will be moved to the invoked function and the function + // can only be invoked once. That makes ref counting of the handler easier, + // because the shared pointer is not bound to the function and can be gone + // once the invoked function returns (if not used any more). + class CallableTag { + public: + using HandlerFunction = + std::function, bool)>; + + CallableTag() {} + + CallableTag(HandlerFunction func, std::shared_ptr handler) + : handler_function_(std::move(func)), handler_(std::move(handler)) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + } + + // Runs the tag. This should be called only once. The handler is no + // longer owned by this tag after this method is invoked. + void Run(bool ok) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + handler_function_(std::move(handler_), ok); + } + + // Releases and returns the shared pointer to the handler. + std::shared_ptr ReleaseHandler() { + return std::move(handler_); + } private: - void SendHealthLocked(ServingStatus status) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); + HandlerFunction handler_function_ = nullptr; + std::shared_ptr handler_; + }; + // Call handler for Check method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class CheckCallHandler : public CallHandler { + public: + // Instantiates a CheckCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // Not used for Check. + void SendHealth(std::shared_ptr /*self*/, + ServingStatus /*status*/) override {} + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr self, bool ok); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr self, bool ok); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; HealthCheckServiceImpl* service_; - std::string service_name_; - ByteBuffer response_; - grpc::internal::Mutex mu_; - bool write_pending_ ABSL_GUARDED_BY(mu_) = false; - ServingStatus pending_status_ ABSL_GUARDED_BY(mu_) = NOT_FOUND; + ByteBuffer request_; + GenericServerAsyncResponseWriter writer_; + ServerContext ctx_; + + CallableTag next_; }; - explicit HealthCheckServiceImpl(DefaultHealthCheckService* database); + // Call handler for Watch method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class WatchCallHandler : public CallHandler { + public: + // Instantiates a WatchCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); - ~HealthCheckServiceImpl() override; + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); - private: - // Request handler for Check method. - static ServerUnaryReactor* HandleCheckRequest( - DefaultHealthCheckService* database, CallbackServerContext* context, - const ByteBuffer* request, ByteBuffer* response); + void SendHealth(std::shared_ptr self, + ServingStatus status) override; + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr self, bool ok); + + // Requires holding send_mu_. + void SendHealthLocked(std::shared_ptr self, + ServingStatus status); + + // When sending a health result finishes. + void OnSendHealthDone(std::shared_ptr self, bool ok); + + void SendFinish(std::shared_ptr self, const Status& status); + + // Requires holding service_->cq_shutdown_mu_. + void SendFinishLocked(std::shared_ptr self, + const Status& status); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr self, bool ok); + + // Called when AsyncNotifyWhenDone() notifies us. + void OnDoneNotified(std::shared_ptr self, bool ok); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; + HealthCheckServiceImpl* service_; + + ByteBuffer request_; + std::string service_name_; + GenericServerAsyncWriter stream_; + ServerContext ctx_; + + grpc_core::Mutex send_mu_; + bool send_in_flight_ = false; // Guarded by mu_. + ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. + + bool finish_called_ = false; + CallableTag next_; + CallableTag on_done_notified_; + CallableTag on_finish_done_; + }; + + // Handles the incoming requests and drives the completion queue in a loop. + static void Serve(void* arg); // Returns true on success. static bool DecodeRequest(const ByteBuffer& request, std::string* service_name); static bool EncodeResponse(ServingStatus status, ByteBuffer* response); + // Needed to appease Windows compilers, which don't seem to allow + // nested classes to access protected members in the parent's + // superclass. + using Service::RequestAsyncServerStreaming; + using Service::RequestAsyncUnary; + DefaultHealthCheckService* database_; + std::unique_ptr cq_; - grpc::internal::Mutex mu_; - grpc::internal::CondVar shutdown_condition_; - bool shutdown_ ABSL_GUARDED_BY(mu_) = false; - size_t num_watches_ ABSL_GUARDED_BY(mu_) = 0; + // To synchronize the operations related to shutdown state of cq_, so that + // we don't enqueue new tags into cq_ after it is already shut down. + grpc_core::Mutex cq_shutdown_mu_; + std::atomic_bool shutdown_{false}; + std::unique_ptr thread_; }; DefaultHealthCheckService(); @@ -100,7 +241,8 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { ServingStatus GetServingStatus(const std::string& service_name) const; - HealthCheckServiceImpl* GetHealthCheckService(); + HealthCheckServiceImpl* GetHealthCheckService( + std::unique_ptr cq); private: // Stores the current serving status of a service and any call @@ -109,28 +251,31 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: void SetServingStatus(ServingStatus status); ServingStatus GetServingStatus() const { return status_; } - void AddWatch( - grpc_core::RefCountedPtr watcher); - void RemoveWatch(HealthCheckServiceImpl::WatchReactor* watcher); - bool Unused() const { return watchers_.empty() && status_ == NOT_FOUND; } + void AddCallHandler( + std::shared_ptr handler); + void RemoveCallHandler( + const std::shared_ptr& handler); + bool Unused() const { + return call_handlers_.empty() && status_ == NOT_FOUND; + } private: ServingStatus status_ = NOT_FOUND; - std::map> - watchers_; + std::set> + call_handlers_; }; - void RegisterWatch( + void RegisterCallHandler( const std::string& service_name, - grpc_core::RefCountedPtr watcher); + std::shared_ptr handler); - void UnregisterWatch(const std::string& service_name, - HealthCheckServiceImpl::WatchReactor* watcher); + void UnregisterCallHandler( + const std::string& service_name, + const std::shared_ptr& handler); - mutable grpc::internal::Mutex mu_; - bool shutdown_ ABSL_GUARDED_BY(&mu_) = false; - std::map services_map_ ABSL_GUARDED_BY(&mu_); + mutable grpc_core::Mutex mu_; + bool shutdown_ = false; // Guarded by mu_. + std::map services_map_; // Guarded by mu_. std::unique_ptr impl_; }; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 3698cbcd541..875893de74f 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -1031,7 +1031,6 @@ bool Server::RegisterService(const std::string* addr, grpc::Service* service) { has_callback_methods_ = true; grpc::internal::RpcServiceMethod* method_value = method.get(); grpc::CompletionQueue* cq = CallbackCQ(); - grpc_server_register_completion_queue(server_, cq->cq(), nullptr); grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator( cq->cq(), method_registration_tag, [this, cq, method_value] { grpc_core::Server::RegisteredCallAllocation result; @@ -1123,12 +1122,25 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // Only create default health check service when user did not provide an // explicit one. + grpc::ServerCompletionQueue* health_check_cq = nullptr; + grpc::DefaultHealthCheckService::HealthCheckServiceImpl* + default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && grpc::DefaultHealthCheckServiceEnabled()) { - auto default_hc_service = absl::make_unique(); - auto* hc_service_impl = default_hc_service->GetHealthCheckService(); - health_check_service_ = std::move(default_hc_service); - RegisterService(nullptr, hc_service_impl); + auto* default_hc_service = new grpc::DefaultHealthCheckService; + health_check_service_.reset(default_hc_service); + // We create a non-polling CQ to avoid impacting application + // performance. This ensures that we don't introduce thread hops + // for application requests that wind up on this CQ, which is polled + // in its own thread. + health_check_cq = new grpc::ServerCompletionQueue( + GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); + grpc_server_register_completion_queue(server_, health_check_cq->cq(), + nullptr); + default_health_check_service_impl = + default_hc_service->GetHealthCheckService( + std::unique_ptr(health_check_cq)); + RegisterService(nullptr, default_health_check_service_impl); } for (auto& acceptor : acceptors_) { @@ -1175,6 +1187,9 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { new UnimplementedAsyncRequest(this, cqs[i]); } } + if (health_check_cq != nullptr) { + new UnimplementedAsyncRequest(this, health_check_cq); + } unknown_rpc_needed = false; } @@ -1191,6 +1206,10 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { value->Start(); } + if (default_health_check_service_impl != nullptr) { + default_health_check_service_impl->StartServingThread(); + } + for (auto& acceptor : acceptors_) { acceptor->Start(); }