diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 6c8428ebde6..3f7d4fb765f 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -384,7 +384,6 @@ class ServerCompletionQueue : public CompletionQueue { grpc_cq_polling_type polling_type_; friend class ServerBuilder; - friend class Server; }; } // namespace grpc diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index fc3db1bba73..bfda67d0864 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -30,162 +30,29 @@ #include "src/cpp/server/health/health.pb.h" namespace grpc { - -// -// DefaultHealthCheckService -// - -DefaultHealthCheckService::DefaultHealthCheckService() { - services_map_[""].SetServingStatus(SERVING); -} - -void DefaultHealthCheckService::SetServingStatus( - const grpc::string& service_name, bool serving) { - std::unique_lock lock(mu_); - services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING); -} - -void DefaultHealthCheckService::SetServingStatus(bool serving) { - const ServingStatus status = serving ? SERVING : NOT_SERVING; - std::unique_lock lock(mu_); - for (auto& p : services_map_) { - ServiceData& service_data = p.second; - service_data.SetServingStatus(status); - } -} - -DefaultHealthCheckService::ServingStatus -DefaultHealthCheckService::GetServingStatus( - const grpc::string& service_name) const { - std::lock_guard lock(mu_); - auto it = services_map_.find(service_name); - if (it == services_map_.end()) { - return NOT_FOUND; - } - const ServiceData& service_data = it->second; - return service_data.GetServingStatus(); -} - -void DefaultHealthCheckService::RegisterCallHandler( - const grpc::string& service_name, - std::shared_ptr handler) { - std::unique_lock lock(mu_); - ServiceData& service_data = services_map_[service_name]; - service_data.AddCallHandler(handler /* copies ref */); - handler->SendHealth(std::move(handler), service_data.GetServingStatus()); -} - -void DefaultHealthCheckService::UnregisterCallHandler( - const grpc::string& service_name, - std::shared_ptr handler) { - std::unique_lock lock(mu_); - auto it = services_map_.find(service_name); - if (it == services_map_.end()) return; - ServiceData& service_data = it->second; - service_data.RemoveCallHandler(std::move(handler)); - if (service_data.Unused()) { - services_map_.erase(it); - } -} - -DefaultHealthCheckService::HealthCheckServiceImpl* -DefaultHealthCheckService::GetHealthCheckService( - std::unique_ptr cq) { - GPR_ASSERT(impl_ == nullptr); - impl_.reset(new HealthCheckServiceImpl(this, std::move(cq))); - return impl_.get(); -} - -// -// DefaultHealthCheckService::ServiceData -// - -void DefaultHealthCheckService::ServiceData::SetServingStatus( - ServingStatus status) { - status_ = status; - for (auto& call_handler : call_handlers_) { - call_handler->SendHealth(call_handler /* copies ref */, status); - } -} - -void DefaultHealthCheckService::ServiceData::AddCallHandler( - std::shared_ptr handler) { - call_handlers_.insert(std::move(handler)); -} - -void DefaultHealthCheckService::ServiceData::RemoveCallHandler( - std::shared_ptr handler) { - call_handlers_.erase(std::move(handler)); -} - -// -// DefaultHealthCheckService::HealthCheckServiceImpl -// - namespace { const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; -const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch"; } // namespace DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( - DefaultHealthCheckService* database, - std::unique_ptr cq) - : database_(database), cq_(std::move(cq)) { - // Add Check() method. - check_method_ = new internal::RpcServiceMethod( - kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr); - AddMethod(check_method_); - // Add Watch() method. - watch_method_ = new internal::RpcServiceMethod( - kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr); - AddMethod(watch_method_); - // Create serving thread. - thread_ = std::unique_ptr<::grpc_core::Thread>( - new ::grpc_core::Thread("grpc_health_check_service", Serve, this)); -} - -DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { - // We will reach here after the server starts shutting down. - shutdown_ = true; - { - std::unique_lock lock(cq_shutdown_mu_); - cq_->Shutdown(); - } - thread_->Join(); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { - thread_->Start(); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { - HealthCheckServiceImpl* service = - reinterpret_cast(arg); - // TODO(juanlishen): This is a workaround to wait for the cq to be ready. - // Need to figure out why cq is not ready after service starts. - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(1, GPR_TIMESPAN))); - CheckCallHandler::CreateAndStart(service->cq_.get(), service->database_, - service); - WatchCallHandler::CreateAndStart(service->cq_.get(), service->database_, - service); - void* tag; - bool ok; - while (true) { - if (!service->cq_->Next(&tag, &ok)) { - // The completion queue is shutting down. - GPR_ASSERT(service->shutdown_); - break; - } - auto* next_step = static_cast(tag); - next_step->Run(ok); - } -} - -bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( - const ByteBuffer& request, grpc::string* service_name) { + DefaultHealthCheckService* service) + : service_(service), method_(nullptr) { + internal::MethodHandler* handler = + new internal::RpcMethodHandler( + std::mem_fn(&HealthCheckServiceImpl::Check), this); + method_ = new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler); + AddMethod(method_); +} + +Status DefaultHealthCheckService::HealthCheckServiceImpl::Check( + ServerContext* context, const ByteBuffer* request, ByteBuffer* response) { + // Decode request. std::vector slices; - if (!request.Dump(&slices).ok()) return false; + if (!request->Dump(&slices).ok()) { + return Status(StatusCode::INVALID_ARGUMENT, ""); + } uint8_t* request_bytes = nullptr; bool request_bytes_owned = false; size_t request_size = 0; @@ -197,13 +64,14 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( request_size = slices[0].size(); } else { request_bytes_owned = true; - request_bytes = static_cast(gpr_malloc(request.Length())); + request_bytes = static_cast(gpr_malloc(request->Length())); uint8_t* copy_to = request_bytes; for (size_t i = 0; i < slices.size(); i++) { memcpy(copy_to, slices[i].begin(), slices[i].size()); copy_to += slices[i].size(); } } + if (request_bytes != nullptr) { pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size); bool decode_status = pb_decode( @@ -211,22 +79,26 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( if (request_bytes_owned) { gpr_free(request_bytes); } - if (!decode_status) return false; + if (!decode_status) { + return Status(StatusCode::INVALID_ARGUMENT, ""); + } } - *service_name = request_struct.has_service ? request_struct.service : ""; - return true; -} -bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( - ServingStatus status, ByteBuffer* response) { + // Check status from the associated default health checking service. + DefaultHealthCheckService::ServingStatus serving_status = + service_->GetServingStatus( + request_struct.has_service ? request_struct.service : ""); + if (serving_status == DefaultHealthCheckService::NOT_FOUND) { + return Status(StatusCode::NOT_FOUND, ""); + } + + // Encode response grpc_health_v1_HealthCheckResponse response_struct; response_struct.has_status = true; response_struct.status = - status == NOT_FOUND - ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN - : status == SERVING - ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING - : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING; + serving_status == DefaultHealthCheckService::SERVING + ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING + : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING; pb_ostream_t ostream; memset(&ostream, 0, sizeof(ostream)); pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields, @@ -236,282 +108,48 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( GRPC_SLICE_LENGTH(response_slice)); bool encode_status = pb_encode( &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct); - if (!encode_status) return false; + if (!encode_status) { + return Status(StatusCode::INTERNAL, "Failed to encode response."); + } Slice encoded_response(response_slice, Slice::STEAL_REF); ByteBuffer response_buffer(&encoded_response, 1); response->Swap(&response_buffer); - return true; -} - -// -// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler -// - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) { - std::shared_ptr self = - std::make_shared(cq, database, service); - CheckCallHandler* handler = static_cast(self.get()); - { - std::unique_lock lock(service->cq_shutdown_mu_); - if (service->shutdown_) return; - // Request a Check() call. - handler->next_ = - CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_, - &handler->writer_, cq, cq, &handler->next_); - } -} - -DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) - : cq_(cq), database_(database), service_(service), writer_(&ctx_) {} - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - OnCallReceived(std::shared_ptr self, bool ok) { - if (!ok) { - // The value of ok being false means that the server is shutting down. - return; - } - // Spawn a new handler instance to serve the next new client. Every handler - // instance will deallocate itself when it's done. - CreateAndStart(cq_, database_, service_); - // Process request. - gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, - this); - grpc::string service_name; - grpc::Status status = Status::OK; - ByteBuffer response; - if (!service_->DecodeRequest(request_, &service_name)) { - status = Status(StatusCode::INVALID_ARGUMENT, ""); - } else { - ServingStatus serving_status = database_->GetServingStatus(service_name); - if (serving_status == NOT_FOUND) { - status = Status(StatusCode::NOT_FOUND, "service name unknown"); - } else if (!service_->EncodeResponse(serving_status, &response)) { - status = Status(StatusCode::INTERNAL, ""); - } - } - // Send response. - { - std::unique_lock lock(service_->cq_shutdown_mu_); - if (!service_->shutdown_) { - next_ = - CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - if (status.ok()) { - writer_.Finish(response, status, &next_); - } else { - writer_.FinishWithError(status, &next_); - } - } - } -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: - OnFinishDone(std::shared_ptr self, bool ok) { - if (ok) { - gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", - service_, this); - } -} - -// -// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler -// - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) { - std::shared_ptr self = - std::make_shared(cq, database, service); - WatchCallHandler* handler = static_cast(self.get()); - { - std::unique_lock lock(service->cq_shutdown_mu_); - if (service->shutdown_) return; - // Request AsyncNotifyWhenDone(). - handler->on_done_notified_ = - CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler, - std::placeholders::_1, std::placeholders::_2), - self /* copies ref */); - handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_); - // Request a Watch() call. - handler->next_ = - CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_, - &handler->stream_, cq, cq, - &handler->next_); - } + return Status::OK; } -DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service) - : cq_(cq), - database_(database), - service_(service), - stream_(&ctx_), - call_state_(WAITING_FOR_CALL) {} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnCallReceived(std::shared_ptr self, bool ok) { - if (ok) { - call_state_ = CALL_RECEIVED; - } else { - // AsyncNotifyWhenDone() needs to be called before the call starts, but the - // tag will not pop out if the call never starts ( - // https://github.com/grpc/grpc/issues/10136). So we need to manually - // release the ownership of the handler in this case. - GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); - } - if (!ok || shutdown_) { - // The value of ok being false means that the server is shutting down. - Shutdown(std::move(self), "OnCallReceived"); - return; - } - // Spawn a new handler instance to serve the next new client. Every handler - // instance will deallocate itself when it's done. - CreateAndStart(cq_, database_, service_); - // Parse request. - if (!service_->DecodeRequest(request_, &service_name_)) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Finish(Status(StatusCode::INVALID_ARGUMENT, ""), &on_finish_done_); - call_state_ = FINISH_CALLED; - return; - } - // Register the call for updates to the service. - gpr_log(GPR_DEBUG, - "[HCS %p] Health check watch started for service \"%s\" " - "(handler: %p)", - service_, service_name_.c_str(), this); - database_->RegisterCallHandler(service_name_, std::move(self)); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendHealth(std::shared_ptr self, ServingStatus status) { - std::unique_lock lock(mu_); - // If there's already a send in flight, cache the new status, and - // we'll start a new send for it when the one in flight completes. - if (send_in_flight_) { - pending_status_ = status; - return; - } - // Start a send. - SendHealthLocked(std::move(self), status); -} - -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - SendHealthLocked(std::shared_ptr self, ServingStatus status) { - std::unique_lock cq_lock(service_->cq_shutdown_mu_); - if (service_->shutdown_) { - cq_lock.release()->unlock(); - Shutdown(std::move(self), "SendHealthLocked"); - return; - } - send_in_flight_ = true; - call_state_ = SEND_MESSAGE_PENDING; - // Construct response. - ByteBuffer response; - if (!service_->EncodeResponse(status, &response)) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Finish(Status(StatusCode::INTERNAL, ""), &on_finish_done_); - return; - } - next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Write(response, &next_); +DefaultHealthCheckService::DefaultHealthCheckService() { + services_map_.emplace("", true); } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnSendHealthDone(std::shared_ptr self, bool ok) { - if (!ok || shutdown_) { - Shutdown(std::move(self), "OnSendHealthDone"); - return; - } - call_state_ = CALL_RECEIVED; - { - std::unique_lock lock(mu_); - send_in_flight_ = false; - // If we got a new status since we started the last send, start a - // new send for it. - if (pending_status_ != NOT_FOUND) { - auto status = pending_status_; - pending_status_ = NOT_FOUND; - SendHealthLocked(std::move(self), status); - } - } +void DefaultHealthCheckService::SetServingStatus( + const grpc::string& service_name, bool serving) { + std::lock_guard lock(mu_); + services_map_[service_name] = serving; } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnDoneNotified(std::shared_ptr self, bool ok) { - GPR_ASSERT(ok); - done_notified_ = true; - if (ctx_.IsCancelled()) { - is_cancelled_ = true; +void DefaultHealthCheckService::SetServingStatus(bool serving) { + std::lock_guard lock(mu_); + for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) { + iter->second = serving; } - gpr_log(GPR_DEBUG, - "[HCS %p] Healt check call is notified done (handler: %p, " - "is_cancelled: %d).", - service_, this, static_cast(is_cancelled_)); - Shutdown(std::move(self), "OnDoneNotified"); } -// TODO(roth): This method currently assumes that there will be only one -// thread polling the cq and invoking the corresponding callbacks. If -// that changes, we will need to add synchronization here. -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - Shutdown(std::shared_ptr self, const char* reason) { - if (!shutdown_) { - gpr_log(GPR_DEBUG, - "[HCS %p] Shutting down the handler (service_name: \"%s\", " - "handler: %p, reason: %s).", - service_, service_name_.c_str(), this, reason); - shutdown_ = true; - } - // OnCallReceived() may be called after OnDoneNotified(), so we need to - // try to Finish() every time we are in Shutdown(). - if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) { - std::unique_lock lock(service_->cq_shutdown_mu_); - if (!service_->shutdown_) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - // TODO(juanlishen): Maybe add a message proto for the client to - // explicitly cancel the stream so that we can return OK status in such - // cases. - stream_.Finish(Status::CANCELLED, &on_finish_done_); - call_state_ = FINISH_CALLED; - } +DefaultHealthCheckService::ServingStatus +DefaultHealthCheckService::GetServingStatus( + const grpc::string& service_name) const { + std::lock_guard lock(mu_); + const auto& iter = services_map_.find(service_name); + if (iter == services_map_.end()) { + return NOT_FOUND; } + return iter->second ? SERVING : NOT_SERVING; } -void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnFinishDone(std::shared_ptr self, bool ok) { - if (ok) { - gpr_log(GPR_DEBUG, - "[HCS %p] Health check call finished (service_name: \"%s\", " - "handler: %p).", - service_, service_name_.c_str(), this); - } +DefaultHealthCheckService::HealthCheckServiceImpl* +DefaultHealthCheckService::GetHealthCheckService() { + GPR_ASSERT(impl_ == nullptr); + impl_.reset(new HealthCheckServiceImpl(this)); + return impl_.get(); } } // namespace grpc diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index edad5949362..a1ce5aa64ec 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -19,268 +19,42 @@ #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H -#include #include -#include -#include -#include #include -#include -#include #include #include -#include "src/core/lib/gprpp/thd.h" - namespace grpc { // Default implementation of HealthCheckServiceInterface. Server will create and // own it. class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: - enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; - // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - // Base class for call handlers. - class CallHandler { - public: - virtual ~CallHandler() = default; - virtual void SendHealth(std::shared_ptr self, - ServingStatus status) = 0; - }; + explicit HealthCheckServiceImpl(DefaultHealthCheckService* service); - HealthCheckServiceImpl(DefaultHealthCheckService* database, - std::unique_ptr cq); - - ~HealthCheckServiceImpl(); - - void StartServingThread(); + Status Check(ServerContext* context, const ByteBuffer* request, + ByteBuffer* response); private: - // A tag that can be called with a bool argument. It's tailored for - // CallHandler's use. Before being used, it should be constructed with a - // method of CallHandler and a shared pointer to the handler. The - // shared pointer will be moved to the invoked function and the function - // can only be invoked once. That makes ref counting of the handler easier, - // because the shared pointer is not bound to the function and can be gone - // once the invoked function returns (if not used any more). - class CallableTag { - public: - using HandlerFunction = - std::function, bool)>; - - CallableTag() {} - - CallableTag(HandlerFunction func, std::shared_ptr handler) - : handler_function_(std::move(func)), handler_(std::move(handler)) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - } - - // Runs the tag. This should be called only once. The handler is no - // longer owned by this tag after this method is invoked. - void Run(bool ok) { - GPR_ASSERT(handler_function_ != nullptr); - GPR_ASSERT(handler_ != nullptr); - handler_function_(std::move(handler_), ok); - } - - // Releases and returns the shared pointer to the handler. - std::shared_ptr ReleaseHandler() { - return std::move(handler_); - } - - private: - HandlerFunction handler_function_ = nullptr; - std::shared_ptr handler_; - }; - - // Call handler for Check method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class CheckCallHandler : public CallHandler { - public: - // Instantiates a CheckCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - CheckCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // Not used for Check. - void SendHealth(std::shared_ptr self, - ServingStatus status) override {} - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr self, bool ok); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr self, bool ok); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; - - ByteBuffer request_; - GenericServerAsyncResponseWriter writer_; - ServerContext ctx_; - - CallableTag next_; - }; - - // Call handler for Watch method. - // Each handler takes care of one call. It contains per-call data and it - // will access the members of the parent class (i.e., - // DefaultHealthCheckService) for per-service health data. - class WatchCallHandler : public CallHandler { - public: - // Instantiates a WatchCallHandler and requests the next health check - // call. The handler object will manage its own lifetime, so no action is - // needed from the caller any more regarding that object. - static void CreateAndStart(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - // This ctor is public because we want to use std::make_shared<> in - // CreateAndStart(). This ctor shouldn't be used elsewhere. - WatchCallHandler(ServerCompletionQueue* cq, - DefaultHealthCheckService* database, - HealthCheckServiceImpl* service); - - void SendHealth(std::shared_ptr self, - ServingStatus status) override; - - private: - // Called when we receive a call. - // Spawns a new handler so that we can keep servicing future calls. - void OnCallReceived(std::shared_ptr self, bool ok); - - // Requires holding mu_. - void SendHealthLocked(std::shared_ptr self, - ServingStatus status); - - // When sending a health result finishes. - void OnSendHealthDone(std::shared_ptr self, bool ok); - - // Called when Finish() is done. - void OnFinishDone(std::shared_ptr self, bool ok); - - // Called when AsyncNotifyWhenDone() notifies us. - void OnDoneNotified(std::shared_ptr self, bool ok); - - void Shutdown(std::shared_ptr self, const char* reason); - - // The members passed down from HealthCheckServiceImpl. - ServerCompletionQueue* cq_; - DefaultHealthCheckService* database_; - HealthCheckServiceImpl* service_; - - ByteBuffer request_; - grpc::string service_name_; - GenericServerAsyncWriter stream_; - ServerContext ctx_; - - std::mutex mu_; - bool send_in_flight_ = false; // Guarded by mu_. - ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. - - // The state of the RPC progress. - enum CallState { - WAITING_FOR_CALL, - CALL_RECEIVED, - SEND_MESSAGE_PENDING, - FINISH_CALLED - } call_state_; - - bool shutdown_ = false; - bool done_notified_ = false; - bool is_cancelled_ = false; - CallableTag next_; - CallableTag on_done_notified_; - CallableTag on_finish_done_; - }; - - // Handles the incoming requests and drives the completion queue in a loop. - static void Serve(void* arg); - - // Returns true on success. - static bool DecodeRequest(const ByteBuffer& request, - grpc::string* service_name); - static bool EncodeResponse(ServingStatus status, ByteBuffer* response); - - // Needed to appease Windows compilers, which don't seem to allow - // nested classes to access protected members in the parent's - // superclass. - using Service::RequestAsyncServerStreaming; - using Service::RequestAsyncUnary; - - DefaultHealthCheckService* database_; - std::unique_ptr cq_; - internal::RpcServiceMethod* check_method_; - internal::RpcServiceMethod* watch_method_; - - // To synchronize the operations related to shutdown state of cq_, so that - // we don't enqueue new tags into cq_ after it is already shut down. - std::mutex cq_shutdown_mu_; - std::atomic_bool shutdown_{false}; - std::unique_ptr<::grpc_core::Thread> thread_; + const DefaultHealthCheckService* const service_; + internal::RpcServiceMethod* method_; }; DefaultHealthCheckService(); - void SetServingStatus(const grpc::string& service_name, bool serving) override; void SetServingStatus(bool serving) override; - + enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; ServingStatus GetServingStatus(const grpc::string& service_name) const; - - HealthCheckServiceImpl* GetHealthCheckService( - std::unique_ptr cq); + HealthCheckServiceImpl* GetHealthCheckService(); private: - // Stores the current serving status of a service and any call - // handlers registered for updates when the service's status changes. - class ServiceData { - public: - void SetServingStatus(ServingStatus status); - ServingStatus GetServingStatus() const { return status_; } - void AddCallHandler( - std::shared_ptr handler); - void RemoveCallHandler( - std::shared_ptr handler); - bool Unused() const { - return call_handlers_.empty() && status_ == NOT_FOUND; - } - - private: - ServingStatus status_ = NOT_FOUND; - std::set> - call_handlers_; - }; - - void RegisterCallHandler( - const grpc::string& service_name, - std::shared_ptr handler); - - void UnregisterCallHandler( - const grpc::string& service_name, - std::shared_ptr handler); - mutable std::mutex mu_; - std::map services_map_; // Guarded by mu_. + std::map services_map_; std::unique_ptr impl_; }; diff --git a/src/cpp/server/health/health.pb.c b/src/cpp/server/health/health.pb.c index 5c214c7160f..09bd98a3d97 100644 --- a/src/cpp/server/health/health.pb.c +++ b/src/cpp/server/health/health.pb.c @@ -2,6 +2,7 @@ /* Generated by nanopb-0.3.7-dev */ #include "src/cpp/server/health/health.pb.h" + /* @@protoc_insertion_point(includes) */ #if PB_PROTO_HEADER_VERSION != 30 #error Regenerate this file with the current version of nanopb generator. diff --git a/src/cpp/server/health/health.pb.h b/src/cpp/server/health/health.pb.h index 9d54ccd6182..29e1f3bacb1 100644 --- a/src/cpp/server/health/health.pb.h +++ b/src/cpp/server/health/health.pb.h @@ -17,12 +17,11 @@ extern "C" { typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus { grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0, grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1, - grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2, - grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN = 3 + grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2 } grpc_health_v1_HealthCheckResponse_ServingStatus; #define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN -#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN -#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN+1)) +#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING +#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING+1)) /* Struct definitions */ typedef struct _grpc_health_v1_HealthCheckRequest { diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 48f0f110a6c..b8ba7042d90 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -559,20 +559,16 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // Only create default health check service when user did not provide an // explicit one. - ServerCompletionQueue* health_check_cq = nullptr; - DefaultHealthCheckService::HealthCheckServiceImpl* - default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && DefaultHealthCheckServiceEnabled()) { - auto* default_hc_service = new DefaultHealthCheckService; - health_check_service_.reset(default_hc_service); - health_check_cq = new ServerCompletionQueue(GRPC_CQ_DEFAULT_POLLING); - grpc_server_register_completion_queue(server_, health_check_cq->cq(), - nullptr); - default_health_check_service_impl = - default_hc_service->GetHealthCheckService( - std::unique_ptr(health_check_cq)); - RegisterService(nullptr, default_health_check_service_impl); + if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) { + gpr_log(GPR_INFO, + "Default health check service disabled at async-only server."); + } else { + auto* default_hc_service = new DefaultHealthCheckService; + health_check_service_.reset(default_hc_service); + RegisterService(nullptr, default_hc_service->GetHealthCheckService()); + } } grpc_server_start(server_); @@ -587,9 +583,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { new UnimplementedAsyncRequest(this, cqs[i]); } } - if (health_check_cq != nullptr) { - new UnimplementedAsyncRequest(this, health_check_cq); - } } // If this server has any support for synchronous methods (has any sync @@ -602,10 +595,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } - - if (default_health_check_service_impl != nullptr) { - default_health_check_service_impl->StartServingThread(); - } } void Server::ShutdownInternal(gpr_timespec deadline) { diff --git a/src/proto/grpc/health/v1/health.proto b/src/proto/grpc/health/v1/health.proto index 38843ff1e73..4b4677b8a4d 100644 --- a/src/proto/grpc/health/v1/health.proto +++ b/src/proto/grpc/health/v1/health.proto @@ -34,30 +34,10 @@ message HealthCheckResponse { UNKNOWN = 0; SERVING = 1; NOT_SERVING = 2; - SERVICE_UNKNOWN = 3; // Used only by the Watch method. } ServingStatus status = 1; } service Health { - // If the requested service is unknown, the call will fail with status - // NOT_FOUND. rpc Check(HealthCheckRequest) returns (HealthCheckResponse); - - // Performs a watch for the serving status of the requested service. - // The server will immediately send back a message indicating the current - // serving status. It will then subsequently send a new message whenever - // the service's serving status changes. - // - // If the requested service is unknown when the call is received, the - // server will send a message setting the serving status to - // SERVICE_UNKNOWN but will *not* terminate the call. If at some - // future point, the serving status of the service becomes known, the - // server will send a new message with the service's serving status. - // - // If the call terminates with status UNIMPLEMENTED, then clients - // should assume this method is not supported and should not retry the - // call. If the call terminates with any other status (including OK), - // clients should retry the call with appropriate exponential backoff. - rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); } diff --git a/test/cpp/end2end/health_service_end2end_test.cc b/test/cpp/end2end/health_service_end2end_test.cc index fca65dfc13b..1c48b9d151a 100644 --- a/test/cpp/end2end/health_service_end2end_test.cc +++ b/test/cpp/end2end/health_service_end2end_test.cc @@ -64,29 +64,6 @@ class HealthCheckServiceImpl : public ::grpc::health::v1::Health::Service { return Status::OK; } - Status Watch(ServerContext* context, const HealthCheckRequest* request, - ::grpc::ServerWriter* writer) override { - auto last_state = HealthCheckResponse::UNKNOWN; - while (!context->IsCancelled()) { - { - std::lock_guard lock(mu_); - HealthCheckResponse response; - auto iter = status_map_.find(request->service()); - if (iter == status_map_.end()) { - response.set_status(response.SERVICE_UNKNOWN); - } else { - response.set_status(iter->second); - } - if (response.status() != last_state) { - writer->Write(response, ::grpc::WriteOptions()); - } - } - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(1000, GPR_TIMESPAN))); - } - return Status::OK; - } - void SetStatus(const grpc::string& service_name, HealthCheckResponse::ServingStatus status) { std::lock_guard lock(mu_); @@ -129,6 +106,14 @@ class CustomHealthCheckService : public HealthCheckServiceInterface { HealthCheckServiceImpl* impl_; // not owned }; +void LoopCompletionQueue(ServerCompletionQueue* cq) { + void* tag; + bool ok; + while (cq->Next(&tag, &ok)) { + abort(); // Nothing should come out of the cq. + } +} + class HealthServiceEnd2endTest : public ::testing::Test { protected: HealthServiceEnd2endTest() {} @@ -233,33 +218,6 @@ class HealthServiceEnd2endTest : public ::testing::Test { Status(StatusCode::NOT_FOUND, "")); } - void VerifyHealthCheckServiceStreaming() { - const grpc::string kServiceName("service_name"); - HealthCheckServiceInterface* service = server_->GetHealthCheckService(); - // Start Watch for service. - ClientContext context; - HealthCheckRequest request; - request.set_service(kServiceName); - std::unique_ptr<::grpc::ClientReaderInterface> reader = - hc_stub_->Watch(&context, request); - // Initial response will be SERVICE_UNKNOWN. - HealthCheckResponse response; - EXPECT_TRUE(reader->Read(&response)); - EXPECT_EQ(response.SERVICE_UNKNOWN, response.status()); - response.Clear(); - // Now set service to NOT_SERVING and make sure we get an update. - service->SetServingStatus(kServiceName, false); - EXPECT_TRUE(reader->Read(&response)); - EXPECT_EQ(response.NOT_SERVING, response.status()); - response.Clear(); - // Now set service to SERVING and make sure we get another update. - service->SetServingStatus(kServiceName, true); - EXPECT_TRUE(reader->Read(&response)); - EXPECT_EQ(response.SERVING, response.status()); - // Finish call. - context.TryCancel(); - } - TestServiceImpl echo_test_service_; HealthCheckServiceImpl health_check_service_impl_; std::unique_ptr hc_stub_; @@ -287,7 +245,6 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) { EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); SetUpServer(true, false, false, nullptr); VerifyHealthCheckService(); - VerifyHealthCheckServiceStreaming(); // The default service has a size limit of the service name. const grpc::string kTooLongServiceName(201, 'x'); @@ -295,6 +252,22 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) { Status(StatusCode::INVALID_ARGUMENT, "")); } +// The server has no sync service. +TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceAsyncOnly) { + EnableDefaultHealthCheckService(true); + EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); + SetUpServer(false, true, false, nullptr); + cq_thread_ = std::thread(LoopCompletionQueue, cq_.get()); + + HealthCheckServiceInterface* default_service = + server_->GetHealthCheckService(); + EXPECT_TRUE(default_service == nullptr); + + ResetStubs(); + + SendHealthCheckRpc("", Status(StatusCode::UNIMPLEMENTED, "")); +} + // Provide an empty service to disable the default service. TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) { EnableDefaultHealthCheckService(true); @@ -323,7 +296,6 @@ TEST_F(HealthServiceEnd2endTest, ExplicitlyOverride) { ResetStubs(); VerifyHealthCheckService(); - VerifyHealthCheckServiceStreaming(); } } // namespace diff --git a/tools/distrib/check_nanopb_output.sh b/tools/distrib/check_nanopb_output.sh index 1c2ef9b7686..6b98619c320 100755 --- a/tools/distrib/check_nanopb_output.sh +++ b/tools/distrib/check_nanopb_output.sh @@ -16,7 +16,6 @@ set -ex readonly NANOPB_ALTS_TMP_OUTPUT="$(mktemp -d)" -readonly NANOPB_HEALTH_TMP_OUTPUT="$(mktemp -d)" readonly NANOPB_TMP_OUTPUT="$(mktemp -d)" readonly PROTOBUF_INSTALL_PREFIX="$(mktemp -d)" @@ -68,23 +67,6 @@ if ! diff -r "$NANOPB_TMP_OUTPUT" src/core/ext/filters/client_channel/lb_policy/ exit 2 fi -# -# checks for health.proto -# -readonly HEALTH_GRPC_OUTPUT_PATH='src/cpp/server/health' -# nanopb-compile the proto to a temp location -./tools/codegen/core/gen_nano_proto.sh \ - src/proto/grpc/health/v1/health.proto \ - "$NANOPB_HEALTH_TMP_OUTPUT" \ - "$HEALTH_GRPC_OUTPUT_PATH" -# compare outputs to checked compiled code -for NANOPB_OUTPUT_FILE in $NANOPB_HEALTH_TMP_OUTPUT/*.pb.*; do - if ! diff "$NANOPB_OUTPUT_FILE" "src/cpp/server/health/$(basename $NANOPB_OUTPUT_FILE)"; then - echo "Outputs differ: $NANOPB_HEALTH_TMP_OUTPUT vs $HEALTH_GRPC_OUTPUT_PATH" - exit 2 - fi -done - # # Checks for handshaker.proto and transport_security_common.proto #