diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 540700cf7c8..ce3222d2ac2 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -18,7 +18,6 @@ #include "src/cpp/server/health/default_health_check_service.h" -#include #include #include @@ -30,6 +29,7 @@ #include #include +#include #include #include #include @@ -50,7 +50,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() { void DefaultHealthCheckService::SetServingStatus( const std::string& service_name, bool serving) { - grpc_core::MutexLock lock(&mu_); + grpc::internal::MutexLock lock(&mu_); if (shutdown_) { // Set to NOT_SERVING in case service_name is not in the map. serving = false; @@ -60,10 +60,8 @@ void DefaultHealthCheckService::SetServingStatus( void DefaultHealthCheckService::SetServingStatus(bool serving) { const ServingStatus status = serving ? SERVING : NOT_SERVING; - grpc_core::MutexLock lock(&mu_); - if (shutdown_) { - return; - } + grpc::internal::MutexLock lock(&mu_); + if (shutdown_) return; for (auto& p : services_map_) { ServiceData& service_data = p.second; service_data.SetServingStatus(status); @@ -71,10 +69,8 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) { } void DefaultHealthCheckService::Shutdown() { - grpc_core::MutexLock lock(&mu_); - if (shutdown_) { - return; - } + grpc::internal::MutexLock lock(&mu_); + if (shutdown_) return; shutdown_ = true; for (auto& p : services_map_) { ServiceData& service_data = p.second; @@ -85,43 +81,37 @@ void DefaultHealthCheckService::Shutdown() { DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::GetServingStatus( const std::string& service_name) const { - grpc_core::MutexLock lock(&mu_); + grpc::internal::MutexLock lock(&mu_); auto it = services_map_.find(service_name); - if (it == services_map_.end()) { - return NOT_FOUND; - } + if (it == services_map_.end()) return NOT_FOUND; const ServiceData& service_data = it->second; return service_data.GetServingStatus(); } -void DefaultHealthCheckService::RegisterCallHandler( +void DefaultHealthCheckService::RegisterWatch( const std::string& service_name, - std::shared_ptr handler) { - grpc_core::MutexLock lock(&mu_); + grpc_core::RefCountedPtr watcher) { + grpc::internal::MutexLock lock(&mu_); ServiceData& service_data = services_map_[service_name]; - service_data.AddCallHandler(handler /* copies ref */); - HealthCheckServiceImpl::CallHandler* h = handler.get(); - h->SendHealth(std::move(handler), service_data.GetServingStatus()); + watcher->SendHealth(service_data.GetServingStatus()); + service_data.AddWatch(std::move(watcher)); } -void DefaultHealthCheckService::UnregisterCallHandler( +void DefaultHealthCheckService::UnregisterWatch( const std::string& service_name, - const std::shared_ptr& handler) { - grpc_core::MutexLock lock(&mu_); + HealthCheckServiceImpl::WatchReactor* watcher) { + grpc::internal::MutexLock lock(&mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) return; ServiceData& service_data = it->second; - service_data.RemoveCallHandler(handler); - if (service_data.Unused()) { - services_map_.erase(it); - } + service_data.RemoveWatch(watcher); + if (service_data.Unused()) services_map_.erase(it); } DefaultHealthCheckService::HealthCheckServiceImpl* -DefaultHealthCheckService::GetHealthCheckService( - std::unique_ptr cq) { +DefaultHealthCheckService::GetHealthCheckService() { GPR_ASSERT(impl_ == nullptr); - impl_ = absl::make_unique(this, std::move(cq)); + impl_ = absl::make_unique(this); return impl_.get(); } @@ -132,19 +122,19 @@ DefaultHealthCheckService::GetHealthCheckService( void DefaultHealthCheckService::ServiceData::SetServingStatus( ServingStatus status) { status_ = status; - for (auto& call_handler : call_handlers_) { - call_handler->SendHealth(call_handler /* copies ref */, status); + for (const auto& p : watchers_) { + p.first->SendHealth(status); } } -void DefaultHealthCheckService::ServiceData::AddCallHandler( - std::shared_ptr handler) { - call_handlers_.insert(std::move(handler)); +void DefaultHealthCheckService::ServiceData::AddWatch( + grpc_core::RefCountedPtr watcher) { + watchers_[watcher.get()] = std::move(watcher); } -void DefaultHealthCheckService::ServiceData::RemoveCallHandler( - const std::shared_ptr& handler) { - call_handlers_.erase(handler); +void DefaultHealthCheckService::ServiceData::RemoveWatch( + HealthCheckServiceImpl::WatchReactor* watcher) { + watchers_.erase(watcher); } // @@ -157,53 +147,57 @@ const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch"; } // namespace DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( - DefaultHealthCheckService* database, - std::unique_ptr cq) - : database_(database), cq_(std::move(cq)) { + DefaultHealthCheckService* database) + : database_(database) { // Add Check() method. AddMethod(new internal::RpcServiceMethod( kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr)); + MarkMethodCallback( + 0, new internal::CallbackUnaryHandler( + [database](CallbackServerContext* context, + const ByteBuffer* request, ByteBuffer* response) { + return HandleCheckRequest(database, context, request, response); + })); // Add Watch() method. AddMethod(new internal::RpcServiceMethod( kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr)); - // Create serving thread. - thread_ = absl::make_unique("grpc_health_check_service", - Serve, this); + MarkMethodCallback( + 1, new internal::CallbackServerStreamingHandler( + [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) { + return new WatchReactor(this, request); + })); } DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { - // We will reach here after the server starts shutting down. + grpc::internal::MutexLock lock(&mu_); shutdown_ = true; - { - grpc_core::MutexLock lock(&cq_shutdown_mu_); - cq_->Shutdown(); + while (num_watches_ > 0) { + shutdown_condition_.Wait(&mu_); } - thread_->Join(); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { - // Request the calls we're interested in. - // We do this before starting the serving thread, so that we know it's - // done before server startup is complete. - CheckCallHandler::CreateAndStart(cq_.get(), database_, this); - WatchCallHandler::CreateAndStart(cq_.get(), database_, this); - // Start serving thread. - thread_->Start(); } -void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { - HealthCheckServiceImpl* service = static_cast(arg); - void* tag; - bool ok; - while (true) { - if (!service->cq_->Next(&tag, &ok)) { - // The completion queue is shutting down. - GPR_ASSERT(service->shutdown_); - break; - } - auto* next_step = static_cast(tag); - next_step->Run(ok); +ServerUnaryReactor* +DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest( + DefaultHealthCheckService* database, CallbackServerContext* context, + const ByteBuffer* request, ByteBuffer* response) { + auto* reactor = context->DefaultReactor(); + std::string service_name; + if (!DecodeRequest(*request, &service_name)) { + reactor->Finish( + Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); + return reactor; + } + ServingStatus serving_status = database->GetServingStatus(service_name); + if (serving_status == NOT_FOUND) { + reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown")); + return reactor; + } + if (!EncodeResponse(serving_status, response)) { + reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response")); + return reactor; } + reactor->Finish(Status::OK); + return reactor; } bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( @@ -254,245 +248,124 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( } // -// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler +// DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor // -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) { - std::shared_ptr self = - std::make_shared(cq, database, service); - CheckCallHandler* handler = static_cast(self.get()); +DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor( + HealthCheckServiceImpl* service, const ByteBuffer* request) + : service_(service) { { - grpc_core::MutexLock lock(&service->cq_shutdown_mu_); - if (service->shutdown_) return; - // Request a Check() call. - handler->next_ = - CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_, - &handler->writer_, cq, cq, &handler->next_); + grpc::internal::MutexLock lock(&service_->mu_); + ++service_->num_watches_; } -} - -DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) - : cq_(cq), database_(database), service_(service), writer_(&ctx_) {} - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - OnCallReceived(std::shared_ptr self, bool ok) { - if (!ok) { - // The value of ok being false means that the server is shutting down. - return; - } - // Spawn a new handler instance to serve the next new client. Every handler - // instance will deallocate itself when it's done. - CreateAndStart(cq_, database_, service_); - // Process request. - gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, - this); - std::string service_name; - grpc::Status status = Status::OK; - ByteBuffer response; - if (!service_->DecodeRequest(request_, &service_name)) { - status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request"); - } else { - ServingStatus serving_status = database_->GetServingStatus(service_name); - if (serving_status == NOT_FOUND) { - status = Status(StatusCode::NOT_FOUND, "service name unknown"); - } else if (!service_->EncodeResponse(serving_status, &response)) { - status = Status(StatusCode::INTERNAL, "could not encode response"); - } - } - // Send response. - { - grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); - if (!service_->shutdown_) { - next_ = - CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - if (status.ok()) { - writer_.Finish(response, status, &next_); - } else { - writer_.FinishWithError(status, &next_); - } - } - } -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - OnFinishDone(std::shared_ptr self, bool ok) { - if (ok) { - gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", - service_, this); - } - self.reset(); // To appease clang-tidy. -} - -// -// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler -// - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) { - std::shared_ptr self = - std::make_shared(cq, database, service); - WatchCallHandler* handler = static_cast(self.get()); - { - grpc_core::MutexLock lock(&service->cq_shutdown_mu_); - if (service->shutdown_) return; - // Request AsyncNotifyWhenDone(). - handler->on_done_notified_ = - CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler, - std::placeholders::_1, std::placeholders::_2), - self /* copies ref */); - handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_); - // Request a Watch() call. - handler->next_ = - CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_, - &handler->stream_, cq, cq, - &handler->next_); - } -} - -DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) - : cq_(cq), database_(database), service_(service), stream_(&ctx_) {} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnCallReceived(std::shared_ptr self, bool ok) { - if (!ok) { - // Server shutting down. - // - // AsyncNotifyWhenDone() needs to be called before the call starts, but the - // tag will not pop out if the call never starts ( - // https://github.com/grpc/grpc/issues/10136). So we need to manually - // release the ownership of the handler in this case. - GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); - return; - } - // Spawn a new handler instance to serve the next new client. Every handler - // instance will deallocate itself when it's done. - CreateAndStart(cq_, database_, service_); - // Parse request. - if (!service_->DecodeRequest(request_, &service_name_)) { - SendFinish(std::move(self), - Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); + bool success = DecodeRequest(*request, &service_name_); + gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": watch call started", service_, + this, service_name_.c_str()); + if (!success) { + MaybeFinishLocked(Status(StatusCode::INTERNAL, "could not parse request")); return; } // Register the call for updates to the service. - gpr_log(GPR_DEBUG, - "[HCS %p] Health watch started for service \"%s\" (handler: %p)", - service_, service_name_.c_str(), this); - database_->RegisterCallHandler(service_name_, std::move(self)); + service_->database_->RegisterWatch(service_name_, Ref()); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendHealth(std::shared_ptr self, ServingStatus status) { - grpc_core::MutexLock lock(&send_mu_); +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: + SendHealth(ServingStatus status) { + gpr_log(GPR_DEBUG, + "[HCS %p] watcher %p \"%s\": SendHealth() for ServingStatus %d", + service_, this, service_name_.c_str(), status); + grpc::internal::MutexLock lock(&mu_); // If there's already a send in flight, cache the new status, and // we'll start a new send for it when the one in flight completes. - if (send_in_flight_) { + if (write_pending_) { + gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": queuing write", service_, + this, service_name_.c_str()); pending_status_ = status; return; } // Start a send. - SendHealthLocked(std::move(self), status); + SendHealthLocked(status); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendHealthLocked(std::shared_ptr self, ServingStatus status) { - send_in_flight_ = true; - // Construct response. - ByteBuffer response; - bool success = service_->EncodeResponse(status, &response); - // Grab shutdown lock and send response. - grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); - if (service_->shutdown_) { - SendFinishLocked(std::move(self), Status::CANCELLED); - return; +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: + SendHealthLocked(ServingStatus status) { + // Do nothing if Finish() has already been called. + if (finish_called_) return; + // Check if we're shutting down. + { + grpc::internal::MutexLock lock(&service_->mu_); + if (service_->shutdown_) { + MaybeFinishLocked( + Status(StatusCode::CANCELLED, "not writing due to shutdown")); + return; + } } + // Send response. + bool success = EncodeResponse(status, &response_); if (!success) { - SendFinishLocked(std::move(self), - Status(StatusCode::INTERNAL, "could not encode response")); + MaybeFinishLocked( + Status(StatusCode::INTERNAL, "could not encode response")); return; } - next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Write(response, &next_); + gpr_log(GPR_DEBUG, + "[HCS %p] watcher %p \"%s\": starting write for ServingStatus %d", + service_, this, service_name_.c_str(), status); + write_pending_ = true; + StartWrite(&response_); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnSendHealthDone(std::shared_ptr self, bool ok) { +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: + OnWriteDone(bool ok) { + gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnWriteDone(): ok=%d", + service_, this, service_name_.c_str(), ok); + response_.Clear(); + grpc::internal::MutexLock lock(&mu_); if (!ok) { - SendFinish(std::move(self), Status::CANCELLED); + MaybeFinishLocked(Status(StatusCode::CANCELLED, "OnWriteDone() ok=false")); return; } - grpc_core::MutexLock lock(&send_mu_); - send_in_flight_ = false; + write_pending_ = false; // If we got a new status since we started the last send, start a // new send for it. if (pending_status_ != NOT_FOUND) { auto status = pending_status_; pending_status_ = NOT_FOUND; - SendHealthLocked(std::move(self), status); + SendHealthLocked(status); } } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendFinish(std::shared_ptr self, const Status& status) { - if (finish_called_) return; - grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); - if (service_->shutdown_) return; - SendFinishLocked(std::move(self), status); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendFinishLocked(std::shared_ptr self, const Status& status) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Finish(status, &on_finish_done_); - finish_called_ = true; +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: + OnCancel() { + grpc::internal::MutexLock lock(&mu_); + MaybeFinishLocked(Status(StatusCode::UNKNOWN, "OnCancel()")); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnFinishDone(std::shared_ptr self, bool ok) { - if (ok) { - gpr_log(GPR_DEBUG, - "[HCS %p] Health watch call finished (service_name: \"%s\", " - "handler: %p).", - service_, service_name_.c_str(), this); +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() { + gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnDone()", service_, this, + service_name_.c_str()); + service_->database_->UnregisterWatch(service_name_, this); + { + grpc::internal::MutexLock lock(&service_->mu_); + if (--service_->num_watches_ == 0 && service_->shutdown_) { + service_->shutdown_condition_.Signal(); + } } - self.reset(); // To appease clang-tidy. + // Free the initial ref from instantiation. + Unref(); } -// TODO(roth): This method currently assumes that there will be only one -// thread polling the cq and invoking the corresponding callbacks. If -// that changes, we will need to add synchronization here. -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnDoneNotified(std::shared_ptr self, bool ok) { - GPR_ASSERT(ok); +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: + MaybeFinishLocked(Status status) { gpr_log(GPR_DEBUG, - "[HCS %p] Health watch call is notified done (handler: %p, " - "is_cancelled: %d).", - service_, this, static_cast(ctx_.IsCancelled())); - database_->UnregisterCallHandler(service_name_, self); - SendFinish(std::move(self), Status::CANCELLED); + "[HCS %p] watcher %p \"%s\": MaybeFinishLocked() with code=%d msg=%s", + service_, this, service_name_.c_str(), status.error_code(), + status.error_message().c_str()); + if (!finish_called_) { + gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": actually calling Finish()", + service_, this, service_name_.c_str()); + finish_called_ = true; + Finish(status); + } } } // namespace grpc diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index c28d96ddfbe..449134b0a2b 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -19,27 +19,25 @@ #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H -#include -#include +#include + #include #include -#include #include -#include -#include -#include +#include "absl/base/thread_annotations.h" + #include #include -#include // IWYU pragma: keep -#include // IWYU pragma: keep +#include #include #include #include +#include #include -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" namespace grpc { @@ -52,191 +50,55 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - // Base class for call handlers. - class CallHandler { + // Reactor for handling Watch streams. + class WatchReactor : public ServerWriteReactor, + public grpc_core::RefCounted { public: - virtual ~CallHandler() = default; - virtual void SendHealth(std::shared_ptr self, - ServingStatus status) = 0; - }; + WatchReactor(HealthCheckServiceImpl* service, const ByteBuffer* request); - HealthCheckServiceImpl(DefaultHealthCheckService* database, - std::unique_ptr cq); - - ~HealthCheckServiceImpl() override; + void SendHealth(ServingStatus status); - void StartServingThread(); - - private: - // A tag that can be called with a bool argument. It's tailored for - // CallHandler's use. Before being used, it should be constructed with a - // method of CallHandler and a shared pointer to the handler. The - // shared pointer will be moved to the invoked function and the function - // can only be invoked once. That makes ref counting of the handler easier, - // because the shared pointer is not bound to the function and can be gone - // once the invoked function returns (if not used any more). - class CallableTag { - public: - using HandlerFunction = - std::function, bool)>; - - CallableTag() {} - - CallableTag(HandlerFunction func, std::shared_ptr handler) - : handler_function_(std::move(func)), handler_(std::move(handler)) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - } - - // Runs the tag. This should be called only once. The handler is no - // longer owned by this tag after this method is invoked. - void Run(bool ok) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - handler_function_(std::move(handler_), ok); - } - - // Releases and returns the shared pointer to the handler. - std::shared_ptr ReleaseHandler() { - return std::move(handler_); - } + void OnWriteDone(bool ok) override; + void OnCancel() override; + void OnDone() override; private: - HandlerFunction handler_function_ = nullptr; - std::shared_ptr handler_; - }; + void SendHealthLocked(ServingStatus status) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); - // Call handler for Check method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class CheckCallHandler : public CallHandler { - public: - // Instantiates a CheckCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // Not used for Check. - void SendHealth(std::shared_ptr /*self*/, - ServingStatus /*status*/) override {} - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr self, bool ok); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr self, bool ok); + void MaybeFinishLocked(Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; HealthCheckServiceImpl* service_; + std::string service_name_; + ByteBuffer response_; - ByteBuffer request_; - GenericServerAsyncResponseWriter writer_; - ServerContext ctx_; - - CallableTag next_; + grpc::internal::Mutex mu_; + bool write_pending_ ABSL_GUARDED_BY(mu_) = false; + ServingStatus pending_status_ ABSL_GUARDED_BY(mu_) = NOT_FOUND; + bool finish_called_ ABSL_GUARDED_BY(mu_) = false; }; - // Call handler for Watch method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class WatchCallHandler : public CallHandler { - public: - // Instantiates a WatchCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - void SendHealth(std::shared_ptr self, - ServingStatus status) override; - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr self, bool ok); - - // Requires holding send_mu_. - void SendHealthLocked(std::shared_ptr self, - ServingStatus status); - - // When sending a health result finishes. - void OnSendHealthDone(std::shared_ptr self, bool ok); - - void SendFinish(std::shared_ptr self, const Status& status); - - // Requires holding service_->cq_shutdown_mu_. - void SendFinishLocked(std::shared_ptr self, - const Status& status); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr self, bool ok); - - // Called when AsyncNotifyWhenDone() notifies us. - void OnDoneNotified(std::shared_ptr self, bool ok); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; + explicit HealthCheckServiceImpl(DefaultHealthCheckService* database); - ByteBuffer request_; - std::string service_name_; - GenericServerAsyncWriter stream_; - ServerContext ctx_; - - grpc_core::Mutex send_mu_; - bool send_in_flight_ = false; // Guarded by mu_. - ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. - - bool finish_called_ = false; - CallableTag next_; - CallableTag on_done_notified_; - CallableTag on_finish_done_; - }; + ~HealthCheckServiceImpl() override; - // Handles the incoming requests and drives the completion queue in a loop. - static void Serve(void* arg); + private: + // Request handler for Check method. + static ServerUnaryReactor* HandleCheckRequest( + DefaultHealthCheckService* database, CallbackServerContext* context, + const ByteBuffer* request, ByteBuffer* response); // Returns true on success. static bool DecodeRequest(const ByteBuffer& request, std::string* service_name); static bool EncodeResponse(ServingStatus status, ByteBuffer* response); - // Needed to appease Windows compilers, which don't seem to allow - // nested classes to access protected members in the parent's - // superclass. - using Service::RequestAsyncServerStreaming; - using Service::RequestAsyncUnary; - DefaultHealthCheckService* database_; - std::unique_ptr cq_; - // To synchronize the operations related to shutdown state of cq_, so that - // we don't enqueue new tags into cq_ after it is already shut down. - grpc_core::Mutex cq_shutdown_mu_; - std::atomic_bool shutdown_{false}; - std::unique_ptr thread_; + grpc::internal::Mutex mu_; + grpc::internal::CondVar shutdown_condition_; + bool shutdown_ ABSL_GUARDED_BY(mu_) = false; + size_t num_watches_ ABSL_GUARDED_BY(mu_) = 0; }; DefaultHealthCheckService(); @@ -248,8 +110,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { ServingStatus GetServingStatus(const std::string& service_name) const; - HealthCheckServiceImpl* GetHealthCheckService( - std::unique_ptr cq); + HealthCheckServiceImpl* GetHealthCheckService(); private: // Stores the current serving status of a service and any call @@ -258,31 +119,28 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: void SetServingStatus(ServingStatus status); ServingStatus GetServingStatus() const { return status_; } - void AddCallHandler( - std::shared_ptr handler); - void RemoveCallHandler( - const std::shared_ptr& handler); - bool Unused() const { - return call_handlers_.empty() && status_ == NOT_FOUND; - } + void AddWatch( + grpc_core::RefCountedPtr watcher); + void RemoveWatch(HealthCheckServiceImpl::WatchReactor* watcher); + bool Unused() const { return watchers_.empty() && status_ == NOT_FOUND; } private: ServingStatus status_ = NOT_FOUND; - std::set> - call_handlers_; + std::map> + watchers_; }; - void RegisterCallHandler( + void RegisterWatch( const std::string& service_name, - std::shared_ptr handler); + grpc_core::RefCountedPtr watcher); - void UnregisterCallHandler( - const std::string& service_name, - const std::shared_ptr& handler); + void UnregisterWatch(const std::string& service_name, + HealthCheckServiceImpl::WatchReactor* watcher); - mutable grpc_core::Mutex mu_; - bool shutdown_ = false; // Guarded by mu_. - std::map services_map_; // Guarded by mu_. + mutable grpc::internal::Mutex mu_; + bool shutdown_ ABSL_GUARDED_BY(&mu_) = false; + std::map services_map_ ABSL_GUARDED_BY(&mu_); std::unique_ptr impl_; }; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 79eed95e8d8..9c49f73676f 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -1058,6 +1058,7 @@ bool Server::RegisterService(const std::string* addr, grpc::Service* service) { has_callback_methods_ = true; grpc::internal::RpcServiceMethod* method_value = method.get(); grpc::CompletionQueue* cq = CallbackCQ(); + grpc_server_register_completion_queue(server_, cq->cq(), nullptr); grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator( cq->cq(), method_registration_tag, [this, cq, method_value] { grpc_core::Server::RegisteredCallAllocation result; @@ -1149,58 +1150,45 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { // Only create default health check service when user did not provide an // explicit one. - grpc::ServerCompletionQueue* health_check_cq = nullptr; - grpc::DefaultHealthCheckService::HealthCheckServiceImpl* - default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && grpc::DefaultHealthCheckServiceEnabled()) { - auto* default_hc_service = new grpc::DefaultHealthCheckService; - health_check_service_.reset(default_hc_service); - // We create a non-polling CQ to avoid impacting application - // performance. This ensures that we don't introduce thread hops - // for application requests that wind up on this CQ, which is polled - // in its own thread. - health_check_cq = new grpc::ServerCompletionQueue( - GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); - grpc_server_register_completion_queue(server_, health_check_cq->cq(), - nullptr); - default_health_check_service_impl = - default_hc_service->GetHealthCheckService( - std::unique_ptr(health_check_cq)); - RegisterService(nullptr, default_health_check_service_impl); + auto default_hc_service = absl::make_unique(); + auto* hc_service_impl = default_hc_service->GetHealthCheckService(); + health_check_service_ = std::move(default_hc_service); + RegisterService(nullptr, hc_service_impl); } for (auto& acceptor : acceptors_) { acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_); } - // If this server uses callback methods, then create a callback generic - // service to handle any unimplemented methods using the default reactor - // creator - if (has_callback_methods_ && !has_callback_generic_service_) { - unimplemented_service_ = absl::make_unique(); - RegisterCallbackGenericService(unimplemented_service_.get()); - } - #ifndef NDEBUG for (size_t i = 0; i < num_cqs; i++) { cq_list_.push_back(cqs[i]); } #endif - // If we have a generic service, all unmatched method names go there. - // Otherwise, we must provide at least one RPC request for an "unimplemented" - // RPC, which covers any RPC for a method name that isn't matched. If we - // have a sync service, let it be a sync unimplemented RPC, which must be - // registered before server start (to initialize an AllocatingRequestMatcher). - // If we have an AllocatingRequestMatcher, we can't also specify other - // unimplemented RPCs via explicit async requests, so we won't do so. If we - // only have async services, we can specify unimplemented RPCs on each async - // CQ so that some user polling thread will move them along as long as some - // progress is being made on any RPCs in the system. + // We must have exactly one generic service to handle requests for + // unmatched method names (i.e., to return UNIMPLEMENTED for any RPC + // method for which we don't have a registered implementation). This + // service comes from one of the following places (first match wins): + // - If the application supplied a generic service via either the async + // or callback APIs, we use that. + // - If there are callback methods, register a callback generic service. + // - If there are sync methods, register a sync generic service. + // (This must be done before server start to initialize an + // AllocatingRequestMatcher.) + // - Otherwise (we have only async methods), we wait until the server + // is started and then start an UnimplementedAsyncRequest on each + // async CQ, so that the requests will be moved along by polling + // done in application threads. bool unknown_rpc_needed = !has_async_generic_service_ && !has_callback_generic_service_; - + if (unknown_rpc_needed && has_callback_methods_) { + unimplemented_service_ = absl::make_unique(); + RegisterCallbackGenericService(unimplemented_service_.get()); + unknown_rpc_needed = false; + } if (unknown_rpc_needed && !sync_req_mgrs_.empty()) { sync_req_mgrs_[0]->AddUnknownSyncMethod(); unknown_rpc_needed = false; @@ -1214,9 +1202,6 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { new UnimplementedAsyncRequest(this, cqs[i]); } } - if (health_check_cq != nullptr) { - new UnimplementedAsyncRequest(this, health_check_cq); - } unknown_rpc_needed = false; } @@ -1233,10 +1218,6 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { value->Start(); } - if (default_health_check_service_impl != nullptr) { - default_health_check_service_impl->StartServingThread(); - } - for (auto& acceptor : acceptors_) { acceptor->Start(); }