Simplify call state logic, create non-polling CQ, and some cleanup.

pull/16697/head
Mark D. Roth 6 years ago
parent be1ce0c4cc
commit 4d9ad11653
  1. 131
      src/cpp/server/health/default_health_check_service.cc
  2. 26
      src/cpp/server/health/default_health_check_service.h
  3. 7
      src/cpp/server/server_cc.cc

@ -132,13 +132,11 @@ DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
std::unique_ptr<ServerCompletionQueue> cq) std::unique_ptr<ServerCompletionQueue> cq)
: database_(database), cq_(std::move(cq)) { : database_(database), cq_(std::move(cq)) {
// Add Check() method. // Add Check() method.
check_method_ = new internal::RpcServiceMethod( AddMethod(new internal::RpcServiceMethod(
kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr); kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
AddMethod(check_method_);
// Add Watch() method. // Add Watch() method.
watch_method_ = new internal::RpcServiceMethod( AddMethod(new internal::RpcServiceMethod(
kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr); kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
AddMethod(watch_method_);
// Create serving thread. // Create serving thread.
thread_ = std::unique_ptr<::grpc_core::Thread>( thread_ = std::unique_ptr<::grpc_core::Thread>(
new ::grpc_core::Thread("grpc_health_check_service", Serve, this)); new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
@ -161,10 +159,6 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
HealthCheckServiceImpl* service = HealthCheckServiceImpl* service =
reinterpret_cast<HealthCheckServiceImpl*>(arg); reinterpret_cast<HealthCheckServiceImpl*>(arg);
// TODO(juanlishen): This is a workaround to wait for the cq to be ready.
// Need to figure out why cq is not ready after service starts.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(1, GPR_TIMESPAN)));
CheckCallHandler::CreateAndStart(service->cq_.get(), service->database_, CheckCallHandler::CreateAndStart(service->cq_.get(), service->database_,
service); service);
WatchCallHandler::CreateAndStart(service->cq_.get(), service->database_, WatchCallHandler::CreateAndStart(service->cq_.get(), service->database_,
@ -289,13 +283,13 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
grpc::Status status = Status::OK; grpc::Status status = Status::OK;
ByteBuffer response; ByteBuffer response;
if (!service_->DecodeRequest(request_, &service_name)) { if (!service_->DecodeRequest(request_, &service_name)) {
status = Status(StatusCode::INVALID_ARGUMENT, ""); status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
} else { } else {
ServingStatus serving_status = database_->GetServingStatus(service_name); ServingStatus serving_status = database_->GetServingStatus(service_name);
if (serving_status == NOT_FOUND) { if (serving_status == NOT_FOUND) {
status = Status(StatusCode::NOT_FOUND, "service name unknown"); status = Status(StatusCode::NOT_FOUND, "service name unknown");
} else if (!service_->EncodeResponse(serving_status, &response)) { } else if (!service_->EncodeResponse(serving_status, &response)) {
status = Status(StatusCode::INTERNAL, ""); status = Status(StatusCode::INTERNAL, "could not encode response");
} }
} }
// Send response. // Send response.
@ -361,23 +355,18 @@ DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
: cq_(cq), : cq_(cq),
database_(database), database_(database),
service_(service), service_(service),
stream_(&ctx_), stream_(&ctx_) {}
call_state_(WAITING_FOR_CALL) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) { if (!ok) {
call_state_ = CALL_RECEIVED; // Server shutting down.
} else { //
// AsyncNotifyWhenDone() needs to be called before the call starts, but the // AsyncNotifyWhenDone() needs to be called before the call starts, but the
// tag will not pop out if the call never starts ( // tag will not pop out if the call never starts (
// https://github.com/grpc/grpc/issues/10136). So we need to manually // https://github.com/grpc/grpc/issues/10136). So we need to manually
// release the ownership of the handler in this case. // release the ownership of the handler in this case.
GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
}
if (!ok || shutdown_) {
// The value of ok being false means that the server is shutting down.
Shutdown(std::move(self), "OnCallReceived");
return; return;
} }
// Spawn a new handler instance to serve the next new client. Every handler // Spawn a new handler instance to serve the next new client. Every handler
@ -385,25 +374,20 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
CreateAndStart(cq_, database_, service_); CreateAndStart(cq_, database_, service_);
// Parse request. // Parse request.
if (!service_->DecodeRequest(request_, &service_name_)) { if (!service_->DecodeRequest(request_, &service_name_)) {
on_finish_done_ = SendFinish(std::move(self),
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Finish(Status(StatusCode::INVALID_ARGUMENT, ""), &on_finish_done_);
call_state_ = FINISH_CALLED;
return; return;
} }
// Register the call for updates to the service. // Register the call for updates to the service.
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"[HCS %p] Health check watch started for service \"%s\" " "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
"(handler: %p)",
service_, service_name_.c_str(), this); service_, service_name_.c_str(), this);
database_->RegisterCallHandler(service_name_, std::move(self)); database_->RegisterCallHandler(service_name_, std::move(self));
} }
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) { SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(send_mu_);
// If there's already a send in flight, cache the new status, and // If there's already a send in flight, cache the new status, and
// we'll start a new send for it when the one in flight completes. // we'll start a new send for it when the one in flight completes.
if (send_in_flight_) { if (send_in_flight_) {
@ -416,22 +400,19 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) { SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
send_in_flight_ = true;
// Construct response.
ByteBuffer response;
bool success = service_->EncodeResponse(status, &response);
// Grab shutdown lock and send response.
std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_); std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
if (service_->shutdown_) { if (service_->shutdown_) {
cq_lock.release()->unlock(); SendFinishLocked(std::move(self), Status::CANCELLED);
Shutdown(std::move(self), "SendHealthLocked");
return; return;
} }
send_in_flight_ = true; if (!success) {
call_state_ = SEND_MESSAGE_PENDING; SendFinishLocked(std::move(self),
// Construct response. Status(StatusCode::INTERNAL, "could not encode response"));
ByteBuffer response;
if (!service_->EncodeResponse(status, &response)) {
on_finish_done_ =
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Finish(Status(StatusCode::INTERNAL, ""), &on_finish_done_);
return; return;
} }
next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
@ -442,13 +423,11 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) { OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok || shutdown_) { if (!ok) {
Shutdown(std::move(self), "OnSendHealthDone"); SendFinish(std::move(self), Status::CANCELLED);
return; return;
} }
call_state_ = CALL_RECEIVED; std::unique_lock<std::mutex> lock(send_mu_);
{
std::unique_lock<std::mutex> lock(mu_);
send_in_flight_ = false; send_in_flight_ = false;
// If we got a new status since we started the last send, start a // If we got a new status since we started the last send, start a
// new send for it. // new send for it.
@ -457,61 +436,47 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
pending_status_ = NOT_FOUND; pending_status_ = NOT_FOUND;
SendHealthLocked(std::move(self), status); SendHealthLocked(std::move(self), status);
} }
}
} }
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) { SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
GPR_ASSERT(ok); if (finish_called_) return;
done_notified_ = true; std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
if (ctx_.IsCancelled()) { if (!service_->shutdown_) return;
is_cancelled_ = true; SendFinishLocked(std::move(self), status);
}
gpr_log(GPR_DEBUG,
"[HCS %p] Healt check call is notified done (handler: %p, "
"is_cancelled: %d).",
service_, this, static_cast<int>(is_cancelled_));
Shutdown(std::move(self), "OnDoneNotified");
} }
// TODO(roth): This method currently assumes that there will be only one
// thread polling the cq and invoking the corresponding callbacks. If
// that changes, we will need to add synchronization here.
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
Shutdown(std::shared_ptr<CallHandler> self, const char* reason) { SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
if (!shutdown_) {
gpr_log(GPR_DEBUG,
"[HCS %p] Shutting down the handler (service_name: \"%s\", "
"handler: %p, reason: %s).",
service_, service_name_.c_str(), this, reason);
shutdown_ = true;
}
// OnCallReceived() may be called after OnDoneNotified(), so we need to
// try to Finish() every time we are in Shutdown().
if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) {
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
if (!service_->shutdown_) {
on_finish_done_ = on_finish_done_ =
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2), std::placeholders::_1, std::placeholders::_2),
std::move(self)); std::move(self));
// TODO(juanlishen): Maybe add a message proto for the client to stream_.Finish(status, &on_finish_done_);
// explicitly cancel the stream so that we can return OK status in such finish_called_ = true;
// cases.
stream_.Finish(Status::CANCELLED, &on_finish_done_);
call_state_ = FINISH_CALLED;
}
}
} }
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) { if (ok) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"[HCS %p] Health check call finished (service_name: \"%s\", " "[HCS %p] Health watch call finished (service_name: \"%s\", "
"handler: %p).", "handler: %p).",
service_, service_name_.c_str(), this); service_, service_name_.c_str(), this);
} }
} }
// TODO(roth): This method currently assumes that there will be only one
// thread polling the cq and invoking the corresponding callbacks. If
// that changes, we will need to add synchronization here.
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
GPR_ASSERT(ok);
gpr_log(GPR_DEBUG,
"[HCS %p] Healt watch call is notified done (handler: %p, "
"is_cancelled: %d).",
service_, this, static_cast<int>(ctx_.IsCancelled()));
SendFinish(std::move(self), Status::CANCELLED);
}
} // namespace grpc } // namespace grpc

@ -168,21 +168,25 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
// Spawns a new handler so that we can keep servicing future calls. // Spawns a new handler so that we can keep servicing future calls.
void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
// Requires holding mu_. // Requires holding send_mu_.
void SendHealthLocked(std::shared_ptr<CallHandler> self, void SendHealthLocked(std::shared_ptr<CallHandler> self,
ServingStatus status); ServingStatus status);
// When sending a health result finishes. // When sending a health result finishes.
void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok); void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
// Requires holding service_->cq_shutdown_mu_.
void SendFinishLocked(std::shared_ptr<CallHandler> self,
const Status& status);
// Called when Finish() is done. // Called when Finish() is done.
void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
// Called when AsyncNotifyWhenDone() notifies us. // Called when AsyncNotifyWhenDone() notifies us.
void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok); void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
void Shutdown(std::shared_ptr<CallHandler> self, const char* reason);
// The members passed down from HealthCheckServiceImpl. // The members passed down from HealthCheckServiceImpl.
ServerCompletionQueue* cq_; ServerCompletionQueue* cq_;
DefaultHealthCheckService* database_; DefaultHealthCheckService* database_;
@ -193,21 +197,11 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
GenericServerAsyncWriter stream_; GenericServerAsyncWriter stream_;
ServerContext ctx_; ServerContext ctx_;
std::mutex mu_; std::mutex send_mu_;
bool send_in_flight_ = false; // Guarded by mu_. bool send_in_flight_ = false; // Guarded by mu_.
ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
// The state of the RPC progress. bool finish_called_ = false;
enum CallState {
WAITING_FOR_CALL,
CALL_RECEIVED,
SEND_MESSAGE_PENDING,
FINISH_CALLED
} call_state_;
bool shutdown_ = false;
bool done_notified_ = false;
bool is_cancelled_ = false;
CallableTag next_; CallableTag next_;
CallableTag on_done_notified_; CallableTag on_done_notified_;
CallableTag on_finish_done_; CallableTag on_finish_done_;
@ -229,8 +223,6 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
DefaultHealthCheckService* database_; DefaultHealthCheckService* database_;
std::unique_ptr<ServerCompletionQueue> cq_; std::unique_ptr<ServerCompletionQueue> cq_;
internal::RpcServiceMethod* check_method_;
internal::RpcServiceMethod* watch_method_;
// To synchronize the operations related to shutdown state of cq_, so that // To synchronize the operations related to shutdown state of cq_, so that
// we don't enqueue new tags into cq_ after it is already shut down. // we don't enqueue new tags into cq_ after it is already shut down.

@ -380,7 +380,6 @@ class Server::SyncRequestThreadManager : public ThreadManager {
int cq_timeout_msec_; int cq_timeout_msec_;
std::vector<std::unique_ptr<SyncRequest>> sync_requests_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
std::unique_ptr<internal::RpcServiceMethod> unknown_method_; std::unique_ptr<internal::RpcServiceMethod> unknown_method_;
std::unique_ptr<internal::RpcServiceMethod> health_check_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
}; };
@ -566,7 +565,11 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
DefaultHealthCheckServiceEnabled()) { DefaultHealthCheckServiceEnabled()) {
auto* default_hc_service = new DefaultHealthCheckService; auto* default_hc_service = new DefaultHealthCheckService;
health_check_service_.reset(default_hc_service); health_check_service_.reset(default_hc_service);
health_check_cq = new ServerCompletionQueue(GRPC_CQ_DEFAULT_POLLING); // We create a non-polling CQ to avoid impacting application
// performance. This ensures that we don't introduce thread hops
// for application requests that wind up on this CQ, which is polled
// in its own thread.
health_check_cq = new ServerCompletionQueue(GRPC_CQ_NON_POLLING);
grpc_server_register_completion_queue(server_, health_check_cq->cq(), grpc_server_register_completion_queue(server_, health_check_cq->cq(),
nullptr); nullptr);
default_health_check_service_impl = default_health_check_service_impl =

Loading…
Cancel
Save