diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index ce3222d2ac2..540700cf7c8 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -18,6 +18,7 @@ #include "src/cpp/server/health/default_health_check_service.h" +#include #include #include @@ -29,7 +30,6 @@ #include #include -#include #include #include #include @@ -50,7 +50,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() { void DefaultHealthCheckService::SetServingStatus( const std::string& service_name, bool serving) { - grpc::internal::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { // Set to NOT_SERVING in case service_name is not in the map. serving = false; @@ -60,8 +60,10 @@ void DefaultHealthCheckService::SetServingStatus( void DefaultHealthCheckService::SetServingStatus(bool serving) { const ServingStatus status = serving ? SERVING : NOT_SERVING; - grpc::internal::MutexLock lock(&mu_); - if (shutdown_) return; + grpc_core::MutexLock lock(&mu_); + if (shutdown_) { + return; + } for (auto& p : services_map_) { ServiceData& service_data = p.second; service_data.SetServingStatus(status); @@ -69,8 +71,10 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) { } void DefaultHealthCheckService::Shutdown() { - grpc::internal::MutexLock lock(&mu_); - if (shutdown_) return; + grpc_core::MutexLock lock(&mu_); + if (shutdown_) { + return; + } shutdown_ = true; for (auto& p : services_map_) { ServiceData& service_data = p.second; @@ -81,37 +85,43 @@ void DefaultHealthCheckService::Shutdown() { DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::GetServingStatus( const std::string& service_name) const { - grpc::internal::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); - if (it == services_map_.end()) return NOT_FOUND; + if (it == services_map_.end()) { + return NOT_FOUND; + } const ServiceData& service_data = it->second; return service_data.GetServingStatus(); } -void DefaultHealthCheckService::RegisterWatch( +void DefaultHealthCheckService::RegisterCallHandler( const std::string& service_name, - grpc_core::RefCountedPtr watcher) { - grpc::internal::MutexLock lock(&mu_); + std::shared_ptr handler) { + grpc_core::MutexLock lock(&mu_); ServiceData& service_data = services_map_[service_name]; - watcher->SendHealth(service_data.GetServingStatus()); - service_data.AddWatch(std::move(watcher)); + service_data.AddCallHandler(handler /* copies ref */); + HealthCheckServiceImpl::CallHandler* h = handler.get(); + h->SendHealth(std::move(handler), service_data.GetServingStatus()); } -void DefaultHealthCheckService::UnregisterWatch( +void DefaultHealthCheckService::UnregisterCallHandler( const std::string& service_name, - HealthCheckServiceImpl::WatchReactor* watcher) { - grpc::internal::MutexLock lock(&mu_); + const std::shared_ptr& handler) { + grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) return; ServiceData& service_data = it->second; - service_data.RemoveWatch(watcher); - if (service_data.Unused()) services_map_.erase(it); + service_data.RemoveCallHandler(handler); + if (service_data.Unused()) { + services_map_.erase(it); + } } DefaultHealthCheckService::HealthCheckServiceImpl* -DefaultHealthCheckService::GetHealthCheckService() { +DefaultHealthCheckService::GetHealthCheckService( + std::unique_ptr cq) { GPR_ASSERT(impl_ == nullptr); - impl_ = absl::make_unique(this); + impl_ = absl::make_unique(this, std::move(cq)); return impl_.get(); } @@ -122,19 +132,19 @@ DefaultHealthCheckService::GetHealthCheckService() { void DefaultHealthCheckService::ServiceData::SetServingStatus( ServingStatus status) { status_ = status; - for (const auto& p : watchers_) { - p.first->SendHealth(status); + for (auto& call_handler : call_handlers_) { + call_handler->SendHealth(call_handler /* copies ref */, status); } } -void DefaultHealthCheckService::ServiceData::AddWatch( - grpc_core::RefCountedPtr watcher) { - watchers_[watcher.get()] = std::move(watcher); +void DefaultHealthCheckService::ServiceData::AddCallHandler( + std::shared_ptr handler) { + call_handlers_.insert(std::move(handler)); } -void DefaultHealthCheckService::ServiceData::RemoveWatch( - HealthCheckServiceImpl::WatchReactor* watcher) { - watchers_.erase(watcher); +void DefaultHealthCheckService::ServiceData::RemoveCallHandler( + const std::shared_ptr& handler) { + call_handlers_.erase(handler); } // @@ -147,57 +157,53 @@ const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch"; } // namespace DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( - DefaultHealthCheckService* database) - : database_(database) { + DefaultHealthCheckService* database, + std::unique_ptr cq) + : database_(database), cq_(std::move(cq)) { // Add Check() method. AddMethod(new internal::RpcServiceMethod( kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr)); - MarkMethodCallback( - 0, new internal::CallbackUnaryHandler( - [database](CallbackServerContext* context, - const ByteBuffer* request, ByteBuffer* response) { - return HandleCheckRequest(database, context, request, response); - })); // Add Watch() method. AddMethod(new internal::RpcServiceMethod( kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr)); - MarkMethodCallback( - 1, new internal::CallbackServerStreamingHandler( - [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) { - return new WatchReactor(this, request); - })); + // Create serving thread. + thread_ = absl::make_unique("grpc_health_check_service", + Serve, this); } DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { - grpc::internal::MutexLock lock(&mu_); + // We will reach here after the server starts shutting down. shutdown_ = true; - while (num_watches_ > 0) { - shutdown_condition_.Wait(&mu_); + { + grpc_core::MutexLock lock(&cq_shutdown_mu_); + cq_->Shutdown(); } + thread_->Join(); } -ServerUnaryReactor* -DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest( - DefaultHealthCheckService* database, CallbackServerContext* context, - const ByteBuffer* request, ByteBuffer* response) { - auto* reactor = context->DefaultReactor(); - std::string service_name; - if (!DecodeRequest(*request, &service_name)) { - reactor->Finish( - Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); - return reactor; - } - ServingStatus serving_status = database->GetServingStatus(service_name); - if (serving_status == NOT_FOUND) { - reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown")); - return reactor; - } - if (!EncodeResponse(serving_status, response)) { - reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response")); - return reactor; +void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { + // Request the calls we're interested in. + // We do this before starting the serving thread, so that we know it's + // done before server startup is complete. + CheckCallHandler::CreateAndStart(cq_.get(), database_, this); + WatchCallHandler::CreateAndStart(cq_.get(), database_, this); + // Start serving thread. + thread_->Start(); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { + HealthCheckServiceImpl* service = static_cast(arg); + void* tag; + bool ok; + while (true) { + if (!service->cq_->Next(&tag, &ok)) { + // The completion queue is shutting down. + GPR_ASSERT(service->shutdown_); + break; + } + auto* next_step = static_cast(tag); + next_step->Run(ok); } - reactor->Finish(Status::OK); - return reactor; } bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( @@ -248,124 +254,245 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( } // -// DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor +// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler // -DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor( - HealthCheckServiceImpl* service, const ByteBuffer* request) - : service_(service) { +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) { + std::shared_ptr self = + std::make_shared(cq, database, service); + CheckCallHandler* handler = static_cast(self.get()); { - grpc::internal::MutexLock lock(&service_->mu_); - ++service_->num_watches_; + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + if (service->shutdown_) return; + // Request a Check() call. + handler->next_ = + CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_, + &handler->writer_, cq, cq, &handler->next_); } - bool success = DecodeRequest(*request, &service_name_); - gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": watch call started", service_, - this, service_name_.c_str()); - if (!success) { - MaybeFinishLocked(Status(StatusCode::INTERNAL, "could not parse request")); +} + +DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) + : cq_(cq), database_(database), service_(service), writer_(&ctx_) {} + +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + OnCallReceived(std::shared_ptr self, bool ok) { + if (!ok) { + // The value of ok being false means that the server is shutting down. return; } - // Register the call for updates to the service. - service_->database_->RegisterWatch(service_name_, Ref()); + // Spawn a new handler instance to serve the next new client. Every handler + // instance will deallocate itself when it's done. + CreateAndStart(cq_, database_, service_); + // Process request. + gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, + this); + std::string service_name; + grpc::Status status = Status::OK; + ByteBuffer response; + if (!service_->DecodeRequest(request_, &service_name)) { + status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request"); + } else { + ServingStatus serving_status = database_->GetServingStatus(service_name); + if (serving_status == NOT_FOUND) { + status = Status(StatusCode::NOT_FOUND, "service name unknown"); + } else if (!service_->EncodeResponse(serving_status, &response)) { + status = Status(StatusCode::INTERNAL, "could not encode response"); + } + } + // Send response. + { + grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); + if (!service_->shutdown_) { + next_ = + CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + if (status.ok()) { + writer_.Finish(response, status, &next_); + } else { + writer_.FinishWithError(status, &next_); + } + } + } } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: - SendHealth(ServingStatus status) { +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + OnFinishDone(std::shared_ptr self, bool ok) { + if (ok) { + gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", + service_, this); + } + self.reset(); // To appease clang-tidy. +} + +// +// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler +// + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) { + std::shared_ptr self = + std::make_shared(cq, database, service); + WatchCallHandler* handler = static_cast(self.get()); + { + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + if (service->shutdown_) return; + // Request AsyncNotifyWhenDone(). + handler->on_done_notified_ = + CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler, + std::placeholders::_1, std::placeholders::_2), + self /* copies ref */); + handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_); + // Request a Watch() call. + handler->next_ = + CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_, + &handler->stream_, cq, cq, + &handler->next_); + } +} + +DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) + : cq_(cq), database_(database), service_(service), stream_(&ctx_) {} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnCallReceived(std::shared_ptr self, bool ok) { + if (!ok) { + // Server shutting down. + // + // AsyncNotifyWhenDone() needs to be called before the call starts, but the + // tag will not pop out if the call never starts ( + // https://github.com/grpc/grpc/issues/10136). So we need to manually + // release the ownership of the handler in this case. + GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); + return; + } + // Spawn a new handler instance to serve the next new client. Every handler + // instance will deallocate itself when it's done. + CreateAndStart(cq_, database_, service_); + // Parse request. + if (!service_->DecodeRequest(request_, &service_name_)) { + SendFinish(std::move(self), + Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); + return; + } + // Register the call for updates to the service. gpr_log(GPR_DEBUG, - "[HCS %p] watcher %p \"%s\": SendHealth() for ServingStatus %d", - service_, this, service_name_.c_str(), status); - grpc::internal::MutexLock lock(&mu_); + "[HCS %p] Health watch started for service \"%s\" (handler: %p)", + service_, service_name_.c_str(), this); + database_->RegisterCallHandler(service_name_, std::move(self)); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendHealth(std::shared_ptr self, ServingStatus status) { + grpc_core::MutexLock lock(&send_mu_); // If there's already a send in flight, cache the new status, and // we'll start a new send for it when the one in flight completes. - if (write_pending_) { - gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": queuing write", service_, - this, service_name_.c_str()); + if (send_in_flight_) { pending_status_ = status; return; } // Start a send. - SendHealthLocked(status); + SendHealthLocked(std::move(self), status); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: - SendHealthLocked(ServingStatus status) { - // Do nothing if Finish() has already been called. - if (finish_called_) return; - // Check if we're shutting down. - { - grpc::internal::MutexLock lock(&service_->mu_); - if (service_->shutdown_) { - MaybeFinishLocked( - Status(StatusCode::CANCELLED, "not writing due to shutdown")); - return; - } +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendHealthLocked(std::shared_ptr self, ServingStatus status) { + send_in_flight_ = true; + // Construct response. + ByteBuffer response; + bool success = service_->EncodeResponse(status, &response); + // Grab shutdown lock and send response. + grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); + if (service_->shutdown_) { + SendFinishLocked(std::move(self), Status::CANCELLED); + return; } - // Send response. - bool success = EncodeResponse(status, &response_); if (!success) { - MaybeFinishLocked( - Status(StatusCode::INTERNAL, "could not encode response")); + SendFinishLocked(std::move(self), + Status(StatusCode::INTERNAL, "could not encode response")); return; } - gpr_log(GPR_DEBUG, - "[HCS %p] watcher %p \"%s\": starting write for ServingStatus %d", - service_, this, service_name_.c_str(), status); - write_pending_ = true; - StartWrite(&response_); + next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Write(response, &next_); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: - OnWriteDone(bool ok) { - gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnWriteDone(): ok=%d", - service_, this, service_name_.c_str(), ok); - response_.Clear(); - grpc::internal::MutexLock lock(&mu_); +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnSendHealthDone(std::shared_ptr self, bool ok) { if (!ok) { - MaybeFinishLocked(Status(StatusCode::CANCELLED, "OnWriteDone() ok=false")); + SendFinish(std::move(self), Status::CANCELLED); return; } - write_pending_ = false; + grpc_core::MutexLock lock(&send_mu_); + send_in_flight_ = false; // If we got a new status since we started the last send, start a // new send for it. if (pending_status_ != NOT_FOUND) { auto status = pending_status_; pending_status_ = NOT_FOUND; - SendHealthLocked(status); + SendHealthLocked(std::move(self), status); } } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: - OnCancel() { - grpc::internal::MutexLock lock(&mu_); - MaybeFinishLocked(Status(StatusCode::UNKNOWN, "OnCancel()")); +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendFinish(std::shared_ptr self, const Status& status) { + if (finish_called_) return; + grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); + if (service_->shutdown_) return; + SendFinishLocked(std::move(self), status); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() { - gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnDone()", service_, this, - service_name_.c_str()); - service_->database_->UnregisterWatch(service_name_, this); - { - grpc::internal::MutexLock lock(&service_->mu_); - if (--service_->num_watches_ == 0 && service_->shutdown_) { - service_->shutdown_condition_.Signal(); - } +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendFinishLocked(std::shared_ptr self, const Status& status) { + on_finish_done_ = + CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Finish(status, &on_finish_done_); + finish_called_ = true; +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnFinishDone(std::shared_ptr self, bool ok) { + if (ok) { + gpr_log(GPR_DEBUG, + "[HCS %p] Health watch call finished (service_name: \"%s\", " + "handler: %p).", + service_, service_name_.c_str(), this); } - // Free the initial ref from instantiation. - Unref(); + self.reset(); // To appease clang-tidy. } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: - MaybeFinishLocked(Status status) { +// TODO(roth): This method currently assumes that there will be only one +// thread polling the cq and invoking the corresponding callbacks. If +// that changes, we will need to add synchronization here. +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnDoneNotified(std::shared_ptr self, bool ok) { + GPR_ASSERT(ok); gpr_log(GPR_DEBUG, - "[HCS %p] watcher %p \"%s\": MaybeFinishLocked() with code=%d msg=%s", - service_, this, service_name_.c_str(), status.error_code(), - status.error_message().c_str()); - if (!finish_called_) { - gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": actually calling Finish()", - service_, this, service_name_.c_str()); - finish_called_ = true; - Finish(status); - } + "[HCS %p] Health watch call is notified done (handler: %p, " + "is_cancelled: %d).", + service_, this, static_cast(ctx_.IsCancelled())); + database_->UnregisterCallHandler(service_name_, self); + SendFinish(std::move(self), Status::CANCELLED); } } // namespace grpc diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 449134b0a2b..c28d96ddfbe 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -19,25 +19,27 @@ #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H -#include - +#include +#include #include #include +#include #include +#include -#include "absl/base/thread_annotations.h" - +#include +#include #include #include -#include +#include // IWYU pragma: keep +#include // IWYU pragma: keep #include #include #include -#include #include -#include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/thd.h" namespace grpc { @@ -50,55 +52,191 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - // Reactor for handling Watch streams. - class WatchReactor : public ServerWriteReactor, - public grpc_core::RefCounted { + // Base class for call handlers. + class CallHandler { public: - WatchReactor(HealthCheckServiceImpl* service, const ByteBuffer* request); + virtual ~CallHandler() = default; + virtual void SendHealth(std::shared_ptr self, + ServingStatus status) = 0; + }; - void SendHealth(ServingStatus status); + HealthCheckServiceImpl(DefaultHealthCheckService* database, + std::unique_ptr cq); + + ~HealthCheckServiceImpl() override; - void OnWriteDone(bool ok) override; - void OnCancel() override; - void OnDone() override; + void StartServingThread(); + + private: + // A tag that can be called with a bool argument. It's tailored for + // CallHandler's use. Before being used, it should be constructed with a + // method of CallHandler and a shared pointer to the handler. The + // shared pointer will be moved to the invoked function and the function + // can only be invoked once. That makes ref counting of the handler easier, + // because the shared pointer is not bound to the function and can be gone + // once the invoked function returns (if not used any more). + class CallableTag { + public: + using HandlerFunction = + std::function, bool)>; + + CallableTag() {} + + CallableTag(HandlerFunction func, std::shared_ptr handler) + : handler_function_(std::move(func)), handler_(std::move(handler)) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + } + + // Runs the tag. This should be called only once. The handler is no + // longer owned by this tag after this method is invoked. + void Run(bool ok) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + handler_function_(std::move(handler_), ok); + } + + // Releases and returns the shared pointer to the handler. + std::shared_ptr ReleaseHandler() { + return std::move(handler_); + } private: - void SendHealthLocked(ServingStatus status) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); + HandlerFunction handler_function_ = nullptr; + std::shared_ptr handler_; + }; - void MaybeFinishLocked(Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); + // Call handler for Check method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class CheckCallHandler : public CallHandler { + public: + // Instantiates a CheckCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // Not used for Check. + void SendHealth(std::shared_ptr /*self*/, + ServingStatus /*status*/) override {} + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr self, bool ok); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr self, bool ok); + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; HealthCheckServiceImpl* service_; - std::string service_name_; - ByteBuffer response_; - grpc::internal::Mutex mu_; - bool write_pending_ ABSL_GUARDED_BY(mu_) = false; - ServingStatus pending_status_ ABSL_GUARDED_BY(mu_) = NOT_FOUND; - bool finish_called_ ABSL_GUARDED_BY(mu_) = false; + ByteBuffer request_; + GenericServerAsyncResponseWriter writer_; + ServerContext ctx_; + + CallableTag next_; }; - explicit HealthCheckServiceImpl(DefaultHealthCheckService* database); + // Call handler for Watch method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class WatchCallHandler : public CallHandler { + public: + // Instantiates a WatchCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + void SendHealth(std::shared_ptr self, + ServingStatus status) override; - ~HealthCheckServiceImpl() override; + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr self, bool ok); - private: - // Request handler for Check method. - static ServerUnaryReactor* HandleCheckRequest( - DefaultHealthCheckService* database, CallbackServerContext* context, - const ByteBuffer* request, ByteBuffer* response); + // Requires holding send_mu_. + void SendHealthLocked(std::shared_ptr self, + ServingStatus status); + + // When sending a health result finishes. + void OnSendHealthDone(std::shared_ptr self, bool ok); + + void SendFinish(std::shared_ptr self, const Status& status); + + // Requires holding service_->cq_shutdown_mu_. + void SendFinishLocked(std::shared_ptr self, + const Status& status); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr self, bool ok); + + // Called when AsyncNotifyWhenDone() notifies us. + void OnDoneNotified(std::shared_ptr self, bool ok); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; + HealthCheckServiceImpl* service_; + + ByteBuffer request_; + std::string service_name_; + GenericServerAsyncWriter stream_; + ServerContext ctx_; + + grpc_core::Mutex send_mu_; + bool send_in_flight_ = false; // Guarded by mu_. + ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. + + bool finish_called_ = false; + CallableTag next_; + CallableTag on_done_notified_; + CallableTag on_finish_done_; + }; + + // Handles the incoming requests and drives the completion queue in a loop. + static void Serve(void* arg); // Returns true on success. static bool DecodeRequest(const ByteBuffer& request, std::string* service_name); static bool EncodeResponse(ServingStatus status, ByteBuffer* response); + // Needed to appease Windows compilers, which don't seem to allow + // nested classes to access protected members in the parent's + // superclass. + using Service::RequestAsyncServerStreaming; + using Service::RequestAsyncUnary; + DefaultHealthCheckService* database_; + std::unique_ptr cq_; - grpc::internal::Mutex mu_; - grpc::internal::CondVar shutdown_condition_; - bool shutdown_ ABSL_GUARDED_BY(mu_) = false; - size_t num_watches_ ABSL_GUARDED_BY(mu_) = 0; + // To synchronize the operations related to shutdown state of cq_, so that + // we don't enqueue new tags into cq_ after it is already shut down. + grpc_core::Mutex cq_shutdown_mu_; + std::atomic_bool shutdown_{false}; + std::unique_ptr thread_; }; DefaultHealthCheckService(); @@ -110,7 +248,8 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { ServingStatus GetServingStatus(const std::string& service_name) const; - HealthCheckServiceImpl* GetHealthCheckService(); + HealthCheckServiceImpl* GetHealthCheckService( + std::unique_ptr cq); private: // Stores the current serving status of a service and any call @@ -119,28 +258,31 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: void SetServingStatus(ServingStatus status); ServingStatus GetServingStatus() const { return status_; } - void AddWatch( - grpc_core::RefCountedPtr watcher); - void RemoveWatch(HealthCheckServiceImpl::WatchReactor* watcher); - bool Unused() const { return watchers_.empty() && status_ == NOT_FOUND; } + void AddCallHandler( + std::shared_ptr handler); + void RemoveCallHandler( + const std::shared_ptr& handler); + bool Unused() const { + return call_handlers_.empty() && status_ == NOT_FOUND; + } private: ServingStatus status_ = NOT_FOUND; - std::map> - watchers_; + std::set> + call_handlers_; }; - void RegisterWatch( + void RegisterCallHandler( const std::string& service_name, - grpc_core::RefCountedPtr watcher); + std::shared_ptr handler); - void UnregisterWatch(const std::string& service_name, - HealthCheckServiceImpl::WatchReactor* watcher); + void UnregisterCallHandler( + const std::string& service_name, + const std::shared_ptr& handler); - mutable grpc::internal::Mutex mu_; - bool shutdown_ ABSL_GUARDED_BY(&mu_) = false; - std::map services_map_ ABSL_GUARDED_BY(&mu_); + mutable grpc_core::Mutex mu_; + bool shutdown_ = false; // Guarded by mu_. + std::map services_map_; // Guarded by mu_. std::unique_ptr impl_; }; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 9c49f73676f..79eed95e8d8 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -1058,7 +1058,6 @@ bool Server::RegisterService(const std::string* addr, grpc::Service* service) { has_callback_methods_ = true; grpc::internal::RpcServiceMethod* method_value = method.get(); grpc::CompletionQueue* cq = CallbackCQ(); - grpc_server_register_completion_queue(server_, cq->cq(), nullptr); grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator( cq->cq(), method_registration_tag, [this, cq, method_value] { grpc_core::Server::RegisteredCallAllocation result; @@ -1150,45 +1149,58 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // Only create default health check service when user did not provide an // explicit one. + grpc::ServerCompletionQueue* health_check_cq = nullptr; + grpc::DefaultHealthCheckService::HealthCheckServiceImpl* + default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && grpc::DefaultHealthCheckServiceEnabled()) { - auto default_hc_service = absl::make_unique(); - auto* hc_service_impl = default_hc_service->GetHealthCheckService(); - health_check_service_ = std::move(default_hc_service); - RegisterService(nullptr, hc_service_impl); + auto* default_hc_service = new grpc::DefaultHealthCheckService; + health_check_service_.reset(default_hc_service); + // We create a non-polling CQ to avoid impacting application + // performance. This ensures that we don't introduce thread hops + // for application requests that wind up on this CQ, which is polled + // in its own thread. + health_check_cq = new grpc::ServerCompletionQueue( + GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); + grpc_server_register_completion_queue(server_, health_check_cq->cq(), + nullptr); + default_health_check_service_impl = + default_hc_service->GetHealthCheckService( + std::unique_ptr(health_check_cq)); + RegisterService(nullptr, default_health_check_service_impl); } for (auto& acceptor : acceptors_) { acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_); } + // If this server uses callback methods, then create a callback generic + // service to handle any unimplemented methods using the default reactor + // creator + if (has_callback_methods_ && !has_callback_generic_service_) { + unimplemented_service_ = absl::make_unique(); + RegisterCallbackGenericService(unimplemented_service_.get()); + } + #ifndef NDEBUG for (size_t i = 0; i < num_cqs; i++) { cq_list_.push_back(cqs[i]); } #endif - // We must have exactly one generic service to handle requests for - // unmatched method names (i.e., to return UNIMPLEMENTED for any RPC - // method for which we don't have a registered implementation). This - // service comes from one of the following places (first match wins): - // - If the application supplied a generic service via either the async - // or callback APIs, we use that. - // - If there are callback methods, register a callback generic service. - // - If there are sync methods, register a sync generic service. - // (This must be done before server start to initialize an - // AllocatingRequestMatcher.) - // - Otherwise (we have only async methods), we wait until the server - // is started and then start an UnimplementedAsyncRequest on each - // async CQ, so that the requests will be moved along by polling - // done in application threads. + // If we have a generic service, all unmatched method names go there. + // Otherwise, we must provide at least one RPC request for an "unimplemented" + // RPC, which covers any RPC for a method name that isn't matched. If we + // have a sync service, let it be a sync unimplemented RPC, which must be + // registered before server start (to initialize an AllocatingRequestMatcher). + // If we have an AllocatingRequestMatcher, we can't also specify other + // unimplemented RPCs via explicit async requests, so we won't do so. If we + // only have async services, we can specify unimplemented RPCs on each async + // CQ so that some user polling thread will move them along as long as some + // progress is being made on any RPCs in the system. bool unknown_rpc_needed = !has_async_generic_service_ && !has_callback_generic_service_; - if (unknown_rpc_needed && has_callback_methods_) { - unimplemented_service_ = absl::make_unique(); - RegisterCallbackGenericService(unimplemented_service_.get()); - unknown_rpc_needed = false; - } + if (unknown_rpc_needed && !sync_req_mgrs_.empty()) { sync_req_mgrs_[0]->AddUnknownSyncMethod(); unknown_rpc_needed = false; @@ -1202,6 +1214,9 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { new UnimplementedAsyncRequest(this, cqs[i]); } } + if (health_check_cq != nullptr) { + new UnimplementedAsyncRequest(this, health_check_cq); + } unknown_rpc_needed = false; } @@ -1218,6 +1233,10 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { value->Start(); } + if (default_health_check_service_impl != nullptr) { + default_health_check_service_impl->StartServingThread(); + } + for (auto& acceptor : acceptors_) { acceptor->Start(); }