From 4d9ad11653f76787c9af76c62ae18f7c7ea53c3d Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 25 Sep 2018 11:44:28 -0700 Subject: [PATCH] Simplify call state logic, create non-polling CQ, and some cleanup. --- .../health/default_health_check_service.cc | 153 +++++++----------- .../health/default_health_check_service.h | 26 ++- src/cpp/server/server_cc.cc | 7 +- 3 files changed, 73 insertions(+), 113 deletions(-) diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index fc3db1bba73..e86c05c4b0b 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -132,13 +132,11 @@ DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( std::unique_ptr cq) : database_(database), cq_(std::move(cq)) { // Add Check() method. - check_method_ = new internal::RpcServiceMethod( - kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr); - AddMethod(check_method_); + AddMethod(new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr)); // Add Watch() method. - watch_method_ = new internal::RpcServiceMethod( - kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr); - AddMethod(watch_method_); + AddMethod(new internal::RpcServiceMethod( + kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr)); // Create serving thread. thread_ = std::unique_ptr<::grpc_core::Thread>( new ::grpc_core::Thread("grpc_health_check_service", Serve, this)); @@ -161,10 +159,6 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { HealthCheckServiceImpl* service = reinterpret_cast(arg); - // TODO(juanlishen): This is a workaround to wait for the cq to be ready. - // Need to figure out why cq is not ready after service starts. - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(1, GPR_TIMESPAN))); CheckCallHandler::CreateAndStart(service->cq_.get(), service->database_, service); WatchCallHandler::CreateAndStart(service->cq_.get(), service->database_, @@ -289,13 +283,13 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: grpc::Status status = Status::OK; ByteBuffer response; if (!service_->DecodeRequest(request_, &service_name)) { - status = Status(StatusCode::INVALID_ARGUMENT, ""); + status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request"); } else { ServingStatus serving_status = database_->GetServingStatus(service_name); if (serving_status == NOT_FOUND) { status = Status(StatusCode::NOT_FOUND, "service name unknown"); } else if (!service_->EncodeResponse(serving_status, &response)) { - status = Status(StatusCode::INTERNAL, ""); + status = Status(StatusCode::INTERNAL, "could not encode response"); } } // Send response. @@ -361,23 +355,18 @@ DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: : cq_(cq), database_(database), service_(service), - stream_(&ctx_), - call_state_(WAITING_FOR_CALL) {} + stream_(&ctx_) {} void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: OnCallReceived(std::shared_ptr self, bool ok) { - if (ok) { - call_state_ = CALL_RECEIVED; - } else { + if (!ok) { + // Server shutting down. + // // AsyncNotifyWhenDone() needs to be called before the call starts, but the // tag will not pop out if the call never starts ( // https://github.com/grpc/grpc/issues/10136). So we need to manually // release the ownership of the handler in this case. GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); - } - if (!ok || shutdown_) { - // The value of ok being false means that the server is shutting down. - Shutdown(std::move(self), "OnCallReceived"); return; } // Spawn a new handler instance to serve the next new client. Every handler @@ -385,25 +374,20 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: CreateAndStart(cq_, database_, service_); // Parse request. if (!service_->DecodeRequest(request_, &service_name_)) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Finish(Status(StatusCode::INVALID_ARGUMENT, ""), &on_finish_done_); - call_state_ = FINISH_CALLED; + SendFinish(std::move(self), + Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); return; } // Register the call for updates to the service. gpr_log(GPR_DEBUG, - "[HCS %p] Health check watch started for service \"%s\" " - "(handler: %p)", + "[HCS %p] Health watch started for service \"%s\" (handler: %p)", service_, service_name_.c_str(), this); database_->RegisterCallHandler(service_name_, std::move(self)); } void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendHealth(std::shared_ptr self, ServingStatus status) { - std::unique_lock lock(mu_); + std::unique_lock lock(send_mu_); // If there's already a send in flight, cache the new status, and // we'll start a new send for it when the one in flight completes. if (send_in_flight_) { @@ -416,22 +400,19 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendHealthLocked(std::shared_ptr self, ServingStatus status) { + send_in_flight_ = true; + // Construct response. + ByteBuffer response; + bool success = service_->EncodeResponse(status, &response); + // Grab shutdown lock and send response. std::unique_lock cq_lock(service_->cq_shutdown_mu_); if (service_->shutdown_) { - cq_lock.release()->unlock(); - Shutdown(std::move(self), "SendHealthLocked"); + SendFinishLocked(std::move(self), Status::CANCELLED); return; } - send_in_flight_ = true; - call_state_ = SEND_MESSAGE_PENDING; - // Construct response. - ByteBuffer response; - if (!service_->EncodeResponse(status, &response)) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - stream_.Finish(Status(StatusCode::INTERNAL, ""), &on_finish_done_); + if (!success) { + SendFinishLocked(std::move(self), + Status(StatusCode::INTERNAL, "could not encode response")); return; } next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, @@ -442,76 +423,60 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: OnSendHealthDone(std::shared_ptr self, bool ok) { - if (!ok || shutdown_) { - Shutdown(std::move(self), "OnSendHealthDone"); + if (!ok) { + SendFinish(std::move(self), Status::CANCELLED); return; } - call_state_ = CALL_RECEIVED; - { - std::unique_lock lock(mu_); - send_in_flight_ = false; - // If we got a new status since we started the last send, start a - // new send for it. - if (pending_status_ != NOT_FOUND) { - auto status = pending_status_; - pending_status_ = NOT_FOUND; - SendHealthLocked(std::move(self), status); - } + std::unique_lock lock(send_mu_); + send_in_flight_ = false; + // If we got a new status since we started the last send, start a + // new send for it. + if (pending_status_ != NOT_FOUND) { + auto status = pending_status_; + pending_status_ = NOT_FOUND; + SendHealthLocked(std::move(self), status); } } void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - OnDoneNotified(std::shared_ptr self, bool ok) { - GPR_ASSERT(ok); - done_notified_ = true; - if (ctx_.IsCancelled()) { - is_cancelled_ = true; - } - gpr_log(GPR_DEBUG, - "[HCS %p] Healt check call is notified done (handler: %p, " - "is_cancelled: %d).", - service_, this, static_cast(is_cancelled_)); - Shutdown(std::move(self), "OnDoneNotified"); + SendFinish(std::shared_ptr self, const Status& status) { + if (finish_called_) return; + std::unique_lock cq_lock(service_->cq_shutdown_mu_); + if (!service_->shutdown_) return; + SendFinishLocked(std::move(self), status); } -// TODO(roth): This method currently assumes that there will be only one -// thread polling the cq and invoking the corresponding callbacks. If -// that changes, we will need to add synchronization here. void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: - Shutdown(std::shared_ptr self, const char* reason) { - if (!shutdown_) { - gpr_log(GPR_DEBUG, - "[HCS %p] Shutting down the handler (service_name: \"%s\", " - "handler: %p, reason: %s).", - service_, service_name_.c_str(), this, reason); - shutdown_ = true; - } - // OnCallReceived() may be called after OnDoneNotified(), so we need to - // try to Finish() every time we are in Shutdown(). - if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) { - std::unique_lock lock(service_->cq_shutdown_mu_); - if (!service_->shutdown_) { - on_finish_done_ = - CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, - std::placeholders::_1, std::placeholders::_2), - std::move(self)); - // TODO(juanlishen): Maybe add a message proto for the client to - // explicitly cancel the stream so that we can return OK status in such - // cases. - stream_.Finish(Status::CANCELLED, &on_finish_done_); - call_state_ = FINISH_CALLED; - } - } + SendFinishLocked(std::shared_ptr self, const Status& status) { + on_finish_done_ = + CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Finish(status, &on_finish_done_); + finish_called_ = true; } void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: OnFinishDone(std::shared_ptr self, bool ok) { if (ok) { gpr_log(GPR_DEBUG, - "[HCS %p] Health check call finished (service_name: \"%s\", " + "[HCS %p] Health watch call finished (service_name: \"%s\", " "handler: %p).", service_, service_name_.c_str(), this); } } +// TODO(roth): This method currently assumes that there will be only one +// thread polling the cq and invoking the corresponding callbacks. If +// that changes, we will need to add synchronization here. +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnDoneNotified(std::shared_ptr self, bool ok) { + GPR_ASSERT(ok); + gpr_log(GPR_DEBUG, + "[HCS %p] Healt watch call is notified done (handler: %p, " + "is_cancelled: %d).", + service_, this, static_cast(ctx_.IsCancelled())); + SendFinish(std::move(self), Status::CANCELLED); +} + } // namespace grpc diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index edad5949362..3bab76b6b03 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -168,21 +168,25 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { // Spawns a new handler so that we can keep servicing future calls. void OnCallReceived(std::shared_ptr self, bool ok); - // Requires holding mu_. + // Requires holding send_mu_. void SendHealthLocked(std::shared_ptr self, ServingStatus status); // When sending a health result finishes. void OnSendHealthDone(std::shared_ptr self, bool ok); + void SendFinish(std::shared_ptr self, const Status& status); + + // Requires holding service_->cq_shutdown_mu_. + void SendFinishLocked(std::shared_ptr self, + const Status& status); + // Called when Finish() is done. void OnFinishDone(std::shared_ptr self, bool ok); // Called when AsyncNotifyWhenDone() notifies us. void OnDoneNotified(std::shared_ptr self, bool ok); - void Shutdown(std::shared_ptr self, const char* reason); - // The members passed down from HealthCheckServiceImpl. ServerCompletionQueue* cq_; DefaultHealthCheckService* database_; @@ -193,21 +197,11 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { GenericServerAsyncWriter stream_; ServerContext ctx_; - std::mutex mu_; + std::mutex send_mu_; bool send_in_flight_ = false; // Guarded by mu_. ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. - // The state of the RPC progress. - enum CallState { - WAITING_FOR_CALL, - CALL_RECEIVED, - SEND_MESSAGE_PENDING, - FINISH_CALLED - } call_state_; - - bool shutdown_ = false; - bool done_notified_ = false; - bool is_cancelled_ = false; + bool finish_called_ = false; CallableTag next_; CallableTag on_done_notified_; CallableTag on_finish_done_; @@ -229,8 +223,6 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { DefaultHealthCheckService* database_; std::unique_ptr cq_; - internal::RpcServiceMethod* check_method_; - internal::RpcServiceMethod* watch_method_; // To synchronize the operations related to shutdown state of cq_, so that // we don't enqueue new tags into cq_ after it is already shut down. diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index bf612d2d246..f1f971bf74d 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -380,7 +380,6 @@ class Server::SyncRequestThreadManager : public ThreadManager { int cq_timeout_msec_; std::vector> sync_requests_; std::unique_ptr unknown_method_; - std::unique_ptr health_check_; std::shared_ptr global_callbacks_; }; @@ -566,7 +565,11 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { DefaultHealthCheckServiceEnabled()) { auto* default_hc_service = new DefaultHealthCheckService; health_check_service_.reset(default_hc_service); - health_check_cq = new ServerCompletionQueue(GRPC_CQ_DEFAULT_POLLING); + // We create a non-polling CQ to avoid impacting application + // performance. This ensures that we don't introduce thread hops + // for application requests that wind up on this CQ, which is polled + // in its own thread. + health_check_cq = new ServerCompletionQueue(GRPC_CQ_NON_POLLING); grpc_server_register_completion_queue(server_, health_check_cq->cq(), nullptr); default_health_check_service_impl =