Revert "health check service: rewrite using callback API (#29457)" (#29559)

This reverts commit 2b00c7d2ad.
pull/29514/head^2
Craig Tiller 3 years ago committed by GitHub
parent 815029fe2b
commit d4680eb8eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 386
      src/cpp/server/health/default_health_check_service.cc
  2. 233
      src/cpp/server/health/default_health_check_service.h
  3. 29
      src/cpp/server/server_cc.cc

@ -44,7 +44,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() {
void DefaultHealthCheckService::SetServingStatus( void DefaultHealthCheckService::SetServingStatus(
const std::string& service_name, bool serving) { const std::string& service_name, bool serving) {
grpc::internal::MutexLock lock(&mu_); grpc_core::MutexLock lock(&mu_);
if (shutdown_) { if (shutdown_) {
// Set to NOT_SERVING in case service_name is not in the map. // Set to NOT_SERVING in case service_name is not in the map.
serving = false; serving = false;
@ -54,8 +54,10 @@ void DefaultHealthCheckService::SetServingStatus(
void DefaultHealthCheckService::SetServingStatus(bool serving) { void DefaultHealthCheckService::SetServingStatus(bool serving) {
const ServingStatus status = serving ? SERVING : NOT_SERVING; const ServingStatus status = serving ? SERVING : NOT_SERVING;
grpc::internal::MutexLock lock(&mu_); grpc_core::MutexLock lock(&mu_);
if (shutdown_) return; if (shutdown_) {
return;
}
for (auto& p : services_map_) { for (auto& p : services_map_) {
ServiceData& service_data = p.second; ServiceData& service_data = p.second;
service_data.SetServingStatus(status); service_data.SetServingStatus(status);
@ -63,8 +65,10 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) {
} }
void DefaultHealthCheckService::Shutdown() { void DefaultHealthCheckService::Shutdown() {
grpc::internal::MutexLock lock(&mu_); grpc_core::MutexLock lock(&mu_);
if (shutdown_) return; if (shutdown_) {
return;
}
shutdown_ = true; shutdown_ = true;
for (auto& p : services_map_) { for (auto& p : services_map_) {
ServiceData& service_data = p.second; ServiceData& service_data = p.second;
@ -75,37 +79,43 @@ void DefaultHealthCheckService::Shutdown() {
DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus( DefaultHealthCheckService::GetServingStatus(
const std::string& service_name) const { const std::string& service_name) const {
grpc::internal::MutexLock lock(&mu_); grpc_core::MutexLock lock(&mu_);
auto it = services_map_.find(service_name); auto it = services_map_.find(service_name);
if (it == services_map_.end()) return NOT_FOUND; if (it == services_map_.end()) {
return NOT_FOUND;
}
const ServiceData& service_data = it->second; const ServiceData& service_data = it->second;
return service_data.GetServingStatus(); return service_data.GetServingStatus();
} }
void DefaultHealthCheckService::RegisterWatch( void DefaultHealthCheckService::RegisterCallHandler(
const std::string& service_name, const std::string& service_name,
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) { std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
grpc::internal::MutexLock lock(&mu_); grpc_core::MutexLock lock(&mu_);
ServiceData& service_data = services_map_[service_name]; ServiceData& service_data = services_map_[service_name];
watcher->SendHealth(service_data.GetServingStatus()); service_data.AddCallHandler(handler /* copies ref */);
service_data.AddWatch(std::move(watcher)); HealthCheckServiceImpl::CallHandler* h = handler.get();
h->SendHealth(std::move(handler), service_data.GetServingStatus());
} }
void DefaultHealthCheckService::UnregisterWatch( void DefaultHealthCheckService::UnregisterCallHandler(
const std::string& service_name, const std::string& service_name,
HealthCheckServiceImpl::WatchReactor* watcher) { const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
grpc::internal::MutexLock lock(&mu_); grpc_core::MutexLock lock(&mu_);
auto it = services_map_.find(service_name); auto it = services_map_.find(service_name);
if (it == services_map_.end()) return; if (it == services_map_.end()) return;
ServiceData& service_data = it->second; ServiceData& service_data = it->second;
service_data.RemoveWatch(watcher); service_data.RemoveCallHandler(handler);
if (service_data.Unused()) services_map_.erase(it); if (service_data.Unused()) {
services_map_.erase(it);
}
} }
DefaultHealthCheckService::HealthCheckServiceImpl* DefaultHealthCheckService::HealthCheckServiceImpl*
DefaultHealthCheckService::GetHealthCheckService() { DefaultHealthCheckService::GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq) {
GPR_ASSERT(impl_ == nullptr); GPR_ASSERT(impl_ == nullptr);
impl_ = absl::make_unique<HealthCheckServiceImpl>(this); impl_ = absl::make_unique<HealthCheckServiceImpl>(this, std::move(cq));
return impl_.get(); return impl_.get();
} }
@ -116,19 +126,19 @@ DefaultHealthCheckService::GetHealthCheckService() {
void DefaultHealthCheckService::ServiceData::SetServingStatus( void DefaultHealthCheckService::ServiceData::SetServingStatus(
ServingStatus status) { ServingStatus status) {
status_ = status; status_ = status;
for (const auto& p : watchers_) { for (auto& call_handler : call_handlers_) {
p.first->SendHealth(status); call_handler->SendHealth(call_handler /* copies ref */, status);
} }
} }
void DefaultHealthCheckService::ServiceData::AddWatch( void DefaultHealthCheckService::ServiceData::AddCallHandler(
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) { std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
watchers_[watcher.get()] = std::move(watcher); call_handlers_.insert(std::move(handler));
} }
void DefaultHealthCheckService::ServiceData::RemoveWatch( void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
HealthCheckServiceImpl::WatchReactor* watcher) { const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
watchers_.erase(watcher); call_handlers_.erase(handler);
} }
// //
@ -141,57 +151,53 @@ const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
} // namespace } // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* database) DefaultHealthCheckService* database,
: database_(database) { std::unique_ptr<ServerCompletionQueue> cq)
: database_(database), cq_(std::move(cq)) {
// Add Check() method. // Add Check() method.
AddMethod(new internal::RpcServiceMethod( AddMethod(new internal::RpcServiceMethod(
kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr)); kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
MarkMethodCallback(
0, new internal::CallbackUnaryHandler<ByteBuffer, ByteBuffer>(
[database](CallbackServerContext* context,
const ByteBuffer* request, ByteBuffer* response) {
return HandleCheckRequest(database, context, request, response);
}));
// Add Watch() method. // Add Watch() method.
AddMethod(new internal::RpcServiceMethod( AddMethod(new internal::RpcServiceMethod(
kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr)); kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
MarkMethodCallback( // Create serving thread.
1, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>( thread_ = absl::make_unique<grpc_core::Thread>("grpc_health_check_service",
[this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) { Serve, this);
return new WatchReactor(this, request);
}));
} }
DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
grpc::internal::MutexLock lock(&mu_); // We will reach here after the server starts shutting down.
shutdown_ = true; shutdown_ = true;
while (num_watches_ > 0) { {
shutdown_condition_.Wait(&mu_); grpc_core::MutexLock lock(&cq_shutdown_mu_);
cq_->Shutdown();
} }
thread_->Join();
} }
ServerUnaryReactor* void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest( // Request the calls we're interested in.
DefaultHealthCheckService* database, CallbackServerContext* context, // We do this before starting the serving thread, so that we know it's
const ByteBuffer* request, ByteBuffer* response) { // done before server startup is complete.
auto* reactor = context->DefaultReactor(); CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
std::string service_name; WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
if (!DecodeRequest(*request, &service_name)) { // Start serving thread.
reactor->Finish( thread_->Start();
Status(StatusCode::INVALID_ARGUMENT, "could not parse request")); }
return reactor;
} void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
ServingStatus serving_status = database->GetServingStatus(service_name); HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg);
if (serving_status == NOT_FOUND) { void* tag;
reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown")); bool ok;
return reactor; while (true) {
} if (!service->cq_->Next(&tag, &ok)) {
if (!EncodeResponse(serving_status, response)) { // The completion queue is shutting down.
reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response")); GPR_ASSERT(service->shutdown_);
return reactor; break;
}
auto* next_step = static_cast<CallableTag*>(tag);
next_step->Run(ok);
} }
reactor->Finish(Status::OK);
return reactor;
} }
bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
@ -242,97 +248,245 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
} }
// //
// DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor // DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
// //
DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor( void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
HealthCheckServiceImpl* service, const ByteBuffer* request) CreateAndStart(ServerCompletionQueue* cq,
: service_(service) { DefaultHealthCheckService* database,
HealthCheckServiceImpl* service) {
std::shared_ptr<CallHandler> self =
std::make_shared<CheckCallHandler>(cq, database, service);
CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
{ {
grpc::internal::MutexLock lock(&service_->mu_); grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
++service_->num_watches_; if (service->shutdown_) return;
// Request a Check() call.
handler->next_ =
CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
&handler->writer_, cq, cq, &handler->next_);
} }
if (!DecodeRequest(*request, &service_name_)) { }
Finish(Status(StatusCode::INTERNAL, "could not parse request"));
DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CheckCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service)
: cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
// The value of ok being false means that the server is shutting down.
return;
}
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, database_, service_);
// Process request.
gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
this);
std::string service_name;
grpc::Status status = Status::OK;
ByteBuffer response;
if (!service_->DecodeRequest(request_, &service_name)) {
status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
} else {
ServingStatus serving_status = database_->GetServingStatus(service_name);
if (serving_status == NOT_FOUND) {
status = Status(StatusCode::NOT_FOUND, "service name unknown");
} else if (!service_->EncodeResponse(serving_status, &response)) {
status = Status(StatusCode::INTERNAL, "could not encode response");
}
}
// Send response.
{
grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
if (!service_->shutdown_) {
next_ =
CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
if (status.ok()) {
writer_.Finish(response, status, &next_);
} else {
writer_.FinishWithError(status, &next_);
}
}
}
}
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
service_, this);
}
self.reset(); // To appease clang-tidy.
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
//
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service) {
std::shared_ptr<CallHandler> self =
std::make_shared<WatchCallHandler>(cq, database, service);
WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
{
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request AsyncNotifyWhenDone().
handler->on_done_notified_ =
CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
std::placeholders::_1, std::placeholders::_2),
self /* copies ref */);
handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
// Request a Watch() call.
handler->next_ =
CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
&handler->stream_, cq, cq,
&handler->next_);
}
}
DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
WatchCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service)
: cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
// Server shutting down.
//
// AsyncNotifyWhenDone() needs to be called before the call starts, but the
// tag will not pop out if the call never starts (
// https://github.com/grpc/grpc/issues/10136). So we need to manually
// release the ownership of the handler in this case.
GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
return;
}
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, database_, service_);
// Parse request.
if (!service_->DecodeRequest(request_, &service_name_)) {
SendFinish(std::move(self),
Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
return; return;
} }
// Register the call for updates to the service. // Register the call for updates to the service.
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"[HCS %p] Health watch started for service \"%s\" (reactor: %p)", "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
service_, service_name_.c_str(), this); service_, service_name_.c_str(), this);
service_->database_->RegisterWatch(service_name_, Ref()); database_->RegisterCallHandler(service_name_, std::move(self));
} }
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealth(ServingStatus status) { SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
grpc::internal::MutexLock lock(&mu_); grpc_core::MutexLock lock(&send_mu_);
// If there's already a send in flight, cache the new status, and // If there's already a send in flight, cache the new status, and
// we'll start a new send for it when the one in flight completes. // we'll start a new send for it when the one in flight completes.
if (write_pending_) { if (send_in_flight_) {
pending_status_ = status; pending_status_ = status;
return; return;
} }
// Start a send. // Start a send.
SendHealthLocked(status); SendHealthLocked(std::move(self), status);
} }
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealthLocked(ServingStatus status) { SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
// Check if we're shutting down. send_in_flight_ = true;
{ // Construct response.
grpc::internal::MutexLock lock(&service_->mu_); ByteBuffer response;
if (service_->shutdown_) { bool success = service_->EncodeResponse(status, &response);
Finish(Status::CANCELLED); // Grab shutdown lock and send response.
return; grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
} if (service_->shutdown_) {
SendFinishLocked(std::move(self), Status::CANCELLED);
return;
} }
// Send response.
bool success = EncodeResponse(status, &response_);
if (!success) { if (!success) {
Finish(Status(StatusCode::INTERNAL, "could not encode response")); SendFinishLocked(std::move(self),
Status(StatusCode::INTERNAL, "could not encode response"));
return; return;
} }
write_pending_ = true; next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
StartWrite(&response_); std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Write(response, &next_);
} }
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnWriteDone(bool ok) { OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
response_.Clear();
if (!ok) { if (!ok) {
Finish(Status::CANCELLED); SendFinish(std::move(self), Status::CANCELLED);
return; return;
} }
grpc::internal::MutexLock lock(&mu_); grpc_core::MutexLock lock(&send_mu_);
write_pending_ = false; send_in_flight_ = false;
// If we got a new status since we started the last send, start a // If we got a new status since we started the last send, start a
// new send for it. // new send for it.
if (pending_status_ != NOT_FOUND) { if (pending_status_ != NOT_FOUND) {
auto status = pending_status_; auto status = pending_status_;
pending_status_ = NOT_FOUND; pending_status_ = NOT_FOUND;
SendHealthLocked(status); SendHealthLocked(std::move(self), status);
} }
} }
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnCancel() { SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
Finish(Status(StatusCode::UNKNOWN, "call cancelled by client")); if (finish_called_) return;
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) return;
SendFinishLocked(std::move(self), status);
} }
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() { void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
gpr_log(GPR_DEBUG, SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
"[HCS %p] Health watch call finished (service_name: \"%s\", " on_finish_done_ =
"watcher: %p).", CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
service_, service_name_.c_str(), this); std::placeholders::_1, std::placeholders::_2),
service_->database_->UnregisterWatch(service_name_, this); std::move(self));
{ stream_.Finish(status, &on_finish_done_);
grpc::internal::MutexLock lock(&service_->mu_); finish_called_ = true;
if (--service_->num_watches_ == 0) { }
service_->shutdown_condition_.Signal();
} void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_DEBUG,
"[HCS %p] Health watch call finished (service_name: \"%s\", "
"handler: %p).",
service_, service_name_.c_str(), this);
} }
// Free the initial ref from instantiation. self.reset(); // To appease clang-tidy.
Unref(); }
// TODO(roth): This method currently assumes that there will be only one
// thread polling the cq and invoking the corresponding callbacks. If
// that changes, we will need to add synchronization here.
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
GPR_ASSERT(ok);
gpr_log(GPR_DEBUG,
"[HCS %p] Health watch call is notified done (handler: %p, "
"is_cancelled: %d).",
service_, this, static_cast<int>(ctx_.IsCancelled()));
database_->UnregisterCallHandler(service_name_, self);
SendFinish(std::move(self), Status::CANCELLED);
} }
} // namespace grpc } // namespace grpc

@ -25,12 +25,14 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h> #include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/codegen/server_callback_handlers.h> #include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h> #include <grpcpp/support/byte_buffer.h>
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/thd.h"
namespace grpc { namespace grpc {
@ -43,52 +45,191 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
// The service impl to register with the server. // The service impl to register with the server.
class HealthCheckServiceImpl : public Service { class HealthCheckServiceImpl : public Service {
public: public:
// Reactor for handling Watch streams. // Base class for call handlers.
class WatchReactor : public ServerWriteReactor<ByteBuffer>, class CallHandler {
public grpc_core::RefCounted<WatchReactor> {
public: public:
WatchReactor(HealthCheckServiceImpl* service, const ByteBuffer* request); virtual ~CallHandler() = default;
virtual void SendHealth(std::shared_ptr<CallHandler> self,
ServingStatus status) = 0;
};
HealthCheckServiceImpl(DefaultHealthCheckService* database,
std::unique_ptr<ServerCompletionQueue> cq);
~HealthCheckServiceImpl() override;
void SendHealth(ServingStatus status); void StartServingThread();
void OnWriteDone(bool ok) override; private:
void OnCancel() override; // A tag that can be called with a bool argument. It's tailored for
void OnDone() override; // CallHandler's use. Before being used, it should be constructed with a
// method of CallHandler and a shared pointer to the handler. The
// shared pointer will be moved to the invoked function and the function
// can only be invoked once. That makes ref counting of the handler easier,
// because the shared pointer is not bound to the function and can be gone
// once the invoked function returns (if not used any more).
class CallableTag {
public:
using HandlerFunction =
std::function<void(std::shared_ptr<CallHandler>, bool)>;
CallableTag() {}
CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
: handler_function_(std::move(func)), handler_(std::move(handler)) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
}
// Runs the tag. This should be called only once. The handler is no
// longer owned by this tag after this method is invoked.
void Run(bool ok) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
handler_function_(std::move(handler_), ok);
}
// Releases and returns the shared pointer to the handler.
std::shared_ptr<CallHandler> ReleaseHandler() {
return std::move(handler_);
}
private: private:
void SendHealthLocked(ServingStatus status) HandlerFunction handler_function_ = nullptr;
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); std::shared_ptr<CallHandler> handler_;
};
// Call handler for Check method.
// Each handler takes care of one call. It contains per-call data and it
// will access the members of the parent class (i.e.,
// DefaultHealthCheckService) for per-service health data.
class CheckCallHandler : public CallHandler {
public:
// Instantiates a CheckCallHandler and requests the next health check
// call. The handler object will manage its own lifetime, so no action is
// needed from the caller any more regarding that object.
static void CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
// This ctor is public because we want to use std::make_shared<> in
// CreateAndStart(). This ctor shouldn't be used elsewhere.
CheckCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
// Not used for Check.
void SendHealth(std::shared_ptr<CallHandler> /*self*/,
ServingStatus /*status*/) override {}
private:
// Called when we receive a call.
// Spawns a new handler so that we can keep servicing future calls.
void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
// Called when Finish() is done.
void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
// The members passed down from HealthCheckServiceImpl.
ServerCompletionQueue* cq_;
DefaultHealthCheckService* database_;
HealthCheckServiceImpl* service_; HealthCheckServiceImpl* service_;
std::string service_name_;
ByteBuffer response_;
grpc::internal::Mutex mu_; ByteBuffer request_;
bool write_pending_ ABSL_GUARDED_BY(mu_) = false; GenericServerAsyncResponseWriter writer_;
ServingStatus pending_status_ ABSL_GUARDED_BY(mu_) = NOT_FOUND; ServerContext ctx_;
CallableTag next_;
}; };
explicit HealthCheckServiceImpl(DefaultHealthCheckService* database); // Call handler for Watch method.
// Each handler takes care of one call. It contains per-call data and it
// will access the members of the parent class (i.e.,
// DefaultHealthCheckService) for per-service health data.
class WatchCallHandler : public CallHandler {
public:
// Instantiates a WatchCallHandler and requests the next health check
// call. The handler object will manage its own lifetime, so no action is
// needed from the caller any more regarding that object.
static void CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
~HealthCheckServiceImpl() override; // This ctor is public because we want to use std::make_shared<> in
// CreateAndStart(). This ctor shouldn't be used elsewhere.
WatchCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
private: void SendHealth(std::shared_ptr<CallHandler> self,
// Request handler for Check method. ServingStatus status) override;
static ServerUnaryReactor* HandleCheckRequest(
DefaultHealthCheckService* database, CallbackServerContext* context, private:
const ByteBuffer* request, ByteBuffer* response); // Called when we receive a call.
// Spawns a new handler so that we can keep servicing future calls.
void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
// Requires holding send_mu_.
void SendHealthLocked(std::shared_ptr<CallHandler> self,
ServingStatus status);
// When sending a health result finishes.
void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
// Requires holding service_->cq_shutdown_mu_.
void SendFinishLocked(std::shared_ptr<CallHandler> self,
const Status& status);
// Called when Finish() is done.
void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
// Called when AsyncNotifyWhenDone() notifies us.
void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
// The members passed down from HealthCheckServiceImpl.
ServerCompletionQueue* cq_;
DefaultHealthCheckService* database_;
HealthCheckServiceImpl* service_;
ByteBuffer request_;
std::string service_name_;
GenericServerAsyncWriter stream_;
ServerContext ctx_;
grpc_core::Mutex send_mu_;
bool send_in_flight_ = false; // Guarded by mu_.
ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
bool finish_called_ = false;
CallableTag next_;
CallableTag on_done_notified_;
CallableTag on_finish_done_;
};
// Handles the incoming requests and drives the completion queue in a loop.
static void Serve(void* arg);
// Returns true on success. // Returns true on success.
static bool DecodeRequest(const ByteBuffer& request, static bool DecodeRequest(const ByteBuffer& request,
std::string* service_name); std::string* service_name);
static bool EncodeResponse(ServingStatus status, ByteBuffer* response); static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
// Needed to appease Windows compilers, which don't seem to allow
// nested classes to access protected members in the parent's
// superclass.
using Service::RequestAsyncServerStreaming;
using Service::RequestAsyncUnary;
DefaultHealthCheckService* database_; DefaultHealthCheckService* database_;
std::unique_ptr<ServerCompletionQueue> cq_;
grpc::internal::Mutex mu_; // To synchronize the operations related to shutdown state of cq_, so that
grpc::internal::CondVar shutdown_condition_; // we don't enqueue new tags into cq_ after it is already shut down.
bool shutdown_ ABSL_GUARDED_BY(mu_) = false; grpc_core::Mutex cq_shutdown_mu_;
size_t num_watches_ ABSL_GUARDED_BY(mu_) = 0; std::atomic_bool shutdown_{false};
std::unique_ptr<grpc_core::Thread> thread_;
}; };
DefaultHealthCheckService(); DefaultHealthCheckService();
@ -100,7 +241,8 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
ServingStatus GetServingStatus(const std::string& service_name) const; ServingStatus GetServingStatus(const std::string& service_name) const;
HealthCheckServiceImpl* GetHealthCheckService(); HealthCheckServiceImpl* GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq);
private: private:
// Stores the current serving status of a service and any call // Stores the current serving status of a service and any call
@ -109,28 +251,31 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
public: public:
void SetServingStatus(ServingStatus status); void SetServingStatus(ServingStatus status);
ServingStatus GetServingStatus() const { return status_; } ServingStatus GetServingStatus() const { return status_; }
void AddWatch( void AddCallHandler(
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher); std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
void RemoveWatch(HealthCheckServiceImpl::WatchReactor* watcher); void RemoveCallHandler(
bool Unused() const { return watchers_.empty() && status_ == NOT_FOUND; } const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
bool Unused() const {
return call_handlers_.empty() && status_ == NOT_FOUND;
}
private: private:
ServingStatus status_ = NOT_FOUND; ServingStatus status_ = NOT_FOUND;
std::map<HealthCheckServiceImpl::WatchReactor*, std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor>> call_handlers_;
watchers_;
}; };
void RegisterWatch( void RegisterCallHandler(
const std::string& service_name, const std::string& service_name,
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher); std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
void UnregisterWatch(const std::string& service_name, void UnregisterCallHandler(
HealthCheckServiceImpl::WatchReactor* watcher); const std::string& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
mutable grpc::internal::Mutex mu_; mutable grpc_core::Mutex mu_;
bool shutdown_ ABSL_GUARDED_BY(&mu_) = false; bool shutdown_ = false; // Guarded by mu_.
std::map<std::string, ServiceData> services_map_ ABSL_GUARDED_BY(&mu_); std::map<std::string, ServiceData> services_map_; // Guarded by mu_.
std::unique_ptr<HealthCheckServiceImpl> impl_; std::unique_ptr<HealthCheckServiceImpl> impl_;
}; };

@ -1031,7 +1031,6 @@ bool Server::RegisterService(const std::string* addr, grpc::Service* service) {
has_callback_methods_ = true; has_callback_methods_ = true;
grpc::internal::RpcServiceMethod* method_value = method.get(); grpc::internal::RpcServiceMethod* method_value = method.get();
grpc::CompletionQueue* cq = CallbackCQ(); grpc::CompletionQueue* cq = CallbackCQ();
grpc_server_register_completion_queue(server_, cq->cq(), nullptr);
grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator( grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator(
cq->cq(), method_registration_tag, [this, cq, method_value] { cq->cq(), method_registration_tag, [this, cq, method_value] {
grpc_core::Server::RegisteredCallAllocation result; grpc_core::Server::RegisteredCallAllocation result;
@ -1123,12 +1122,25 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// Only create default health check service when user did not provide an // Only create default health check service when user did not provide an
// explicit one. // explicit one.
grpc::ServerCompletionQueue* health_check_cq = nullptr;
grpc::DefaultHealthCheckService::HealthCheckServiceImpl*
default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ && if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
grpc::DefaultHealthCheckServiceEnabled()) { grpc::DefaultHealthCheckServiceEnabled()) {
auto default_hc_service = absl::make_unique<DefaultHealthCheckService>(); auto* default_hc_service = new grpc::DefaultHealthCheckService;
auto* hc_service_impl = default_hc_service->GetHealthCheckService(); health_check_service_.reset(default_hc_service);
health_check_service_ = std::move(default_hc_service); // We create a non-polling CQ to avoid impacting application
RegisterService(nullptr, hc_service_impl); // performance. This ensures that we don't introduce thread hops
// for application requests that wind up on this CQ, which is polled
// in its own thread.
health_check_cq = new grpc::ServerCompletionQueue(
GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr);
grpc_server_register_completion_queue(server_, health_check_cq->cq(),
nullptr);
default_health_check_service_impl =
default_hc_service->GetHealthCheckService(
std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq));
RegisterService(nullptr, default_health_check_service_impl);
} }
for (auto& acceptor : acceptors_) { for (auto& acceptor : acceptors_) {
@ -1175,6 +1187,9 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]); new UnimplementedAsyncRequest(this, cqs[i]);
} }
} }
if (health_check_cq != nullptr) {
new UnimplementedAsyncRequest(this, health_check_cq);
}
unknown_rpc_needed = false; unknown_rpc_needed = false;
} }
@ -1191,6 +1206,10 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
value->Start(); value->Start();
} }
if (default_health_check_service_impl != nullptr) {
default_health_check_service_impl->StartServingThread();
}
for (auto& acceptor : acceptors_) { for (auto& acceptor : acceptors_) {
acceptor->Start(); acceptor->Start();
} }

Loading…
Cancel
Save