health check service: rewrite using callback API (#29457)

pull/29566/head
Mark D. Roth 3 years ago committed by GitHub
parent 1334f26bb5
commit 2b00c7d2ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 386
      src/cpp/server/health/default_health_check_service.cc
  2. 233
      src/cpp/server/health/default_health_check_service.h
  3. 29
      src/cpp/server/server_cc.cc

@ -44,7 +44,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() {
void DefaultHealthCheckService::SetServingStatus(
const std::string& service_name, bool serving) {
grpc_core::MutexLock lock(&mu_);
grpc::internal::MutexLock lock(&mu_);
if (shutdown_) {
// Set to NOT_SERVING in case service_name is not in the map.
serving = false;
@ -54,10 +54,8 @@ void DefaultHealthCheckService::SetServingStatus(
void DefaultHealthCheckService::SetServingStatus(bool serving) {
const ServingStatus status = serving ? SERVING : NOT_SERVING;
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
return;
}
grpc::internal::MutexLock lock(&mu_);
if (shutdown_) return;
for (auto& p : services_map_) {
ServiceData& service_data = p.second;
service_data.SetServingStatus(status);
@ -65,10 +63,8 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) {
}
void DefaultHealthCheckService::Shutdown() {
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
return;
}
grpc::internal::MutexLock lock(&mu_);
if (shutdown_) return;
shutdown_ = true;
for (auto& p : services_map_) {
ServiceData& service_data = p.second;
@ -79,43 +75,37 @@ void DefaultHealthCheckService::Shutdown() {
DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus(
const std::string& service_name) const {
grpc_core::MutexLock lock(&mu_);
grpc::internal::MutexLock lock(&mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) {
return NOT_FOUND;
}
if (it == services_map_.end()) return NOT_FOUND;
const ServiceData& service_data = it->second;
return service_data.GetServingStatus();
}
void DefaultHealthCheckService::RegisterCallHandler(
void DefaultHealthCheckService::RegisterWatch(
const std::string& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
grpc_core::MutexLock lock(&mu_);
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
grpc::internal::MutexLock lock(&mu_);
ServiceData& service_data = services_map_[service_name];
service_data.AddCallHandler(handler /* copies ref */);
HealthCheckServiceImpl::CallHandler* h = handler.get();
h->SendHealth(std::move(handler), service_data.GetServingStatus());
watcher->SendHealth(service_data.GetServingStatus());
service_data.AddWatch(std::move(watcher));
}
void DefaultHealthCheckService::UnregisterCallHandler(
void DefaultHealthCheckService::UnregisterWatch(
const std::string& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
grpc_core::MutexLock lock(&mu_);
HealthCheckServiceImpl::WatchReactor* watcher) {
grpc::internal::MutexLock lock(&mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) return;
ServiceData& service_data = it->second;
service_data.RemoveCallHandler(handler);
if (service_data.Unused()) {
services_map_.erase(it);
}
service_data.RemoveWatch(watcher);
if (service_data.Unused()) services_map_.erase(it);
}
DefaultHealthCheckService::HealthCheckServiceImpl*
DefaultHealthCheckService::GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq) {
DefaultHealthCheckService::GetHealthCheckService() {
GPR_ASSERT(impl_ == nullptr);
impl_ = absl::make_unique<HealthCheckServiceImpl>(this, std::move(cq));
impl_ = absl::make_unique<HealthCheckServiceImpl>(this);
return impl_.get();
}
@ -126,19 +116,19 @@ DefaultHealthCheckService::GetHealthCheckService(
void DefaultHealthCheckService::ServiceData::SetServingStatus(
ServingStatus status) {
status_ = status;
for (auto& call_handler : call_handlers_) {
call_handler->SendHealth(call_handler /* copies ref */, status);
for (const auto& p : watchers_) {
p.first->SendHealth(status);
}
}
void DefaultHealthCheckService::ServiceData::AddCallHandler(
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
call_handlers_.insert(std::move(handler));
void DefaultHealthCheckService::ServiceData::AddWatch(
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
watchers_[watcher.get()] = std::move(watcher);
}
void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
call_handlers_.erase(handler);
void DefaultHealthCheckService::ServiceData::RemoveWatch(
HealthCheckServiceImpl::WatchReactor* watcher) {
watchers_.erase(watcher);
}
//
@ -151,53 +141,57 @@ const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* database,
std::unique_ptr<ServerCompletionQueue> cq)
: database_(database), cq_(std::move(cq)) {
DefaultHealthCheckService* database)
: database_(database) {
// Add Check() method.
AddMethod(new internal::RpcServiceMethod(
kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
MarkMethodCallback(
0, new internal::CallbackUnaryHandler<ByteBuffer, ByteBuffer>(
[database](CallbackServerContext* context,
const ByteBuffer* request, ByteBuffer* response) {
return HandleCheckRequest(database, context, request, response);
}));
// Add Watch() method.
AddMethod(new internal::RpcServiceMethod(
kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
// Create serving thread.
thread_ = absl::make_unique<grpc_core::Thread>("grpc_health_check_service",
Serve, this);
MarkMethodCallback(
1, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
[this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) {
return new WatchReactor(this, request);
}));
}
DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
// We will reach here after the server starts shutting down.
grpc::internal::MutexLock lock(&mu_);
shutdown_ = true;
{
grpc_core::MutexLock lock(&cq_shutdown_mu_);
cq_->Shutdown();
while (num_watches_ > 0) {
shutdown_condition_.Wait(&mu_);
}
thread_->Join();
}
void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
// Request the calls we're interested in.
// We do this before starting the serving thread, so that we know it's
// done before server startup is complete.
CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
// Start serving thread.
thread_->Start();
}
void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg);
void* tag;
bool ok;
while (true) {
if (!service->cq_->Next(&tag, &ok)) {
// The completion queue is shutting down.
GPR_ASSERT(service->shutdown_);
break;
}
auto* next_step = static_cast<CallableTag*>(tag);
next_step->Run(ok);
ServerUnaryReactor*
DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest(
DefaultHealthCheckService* database, CallbackServerContext* context,
const ByteBuffer* request, ByteBuffer* response) {
auto* reactor = context->DefaultReactor();
std::string service_name;
if (!DecodeRequest(*request, &service_name)) {
reactor->Finish(
Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
return reactor;
}
ServingStatus serving_status = database->GetServingStatus(service_name);
if (serving_status == NOT_FOUND) {
reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown"));
return reactor;
}
if (!EncodeResponse(serving_status, response)) {
reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response"));
return reactor;
}
reactor->Finish(Status::OK);
return reactor;
}
bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
@ -248,245 +242,97 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
// DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor
//
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service) {
std::shared_ptr<CallHandler> self =
std::make_shared<CheckCallHandler>(cq, database, service);
CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor(
HealthCheckServiceImpl* service, const ByteBuffer* request)
: service_(service) {
{
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request a Check() call.
handler->next_ =
CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
&handler->writer_, cq, cq, &handler->next_);
grpc::internal::MutexLock lock(&service_->mu_);
++service_->num_watches_;
}
}
DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CheckCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service)
: cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
// The value of ok being false means that the server is shutting down.
return;
}
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, database_, service_);
// Process request.
gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
this);
std::string service_name;
grpc::Status status = Status::OK;
ByteBuffer response;
if (!service_->DecodeRequest(request_, &service_name)) {
status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
} else {
ServingStatus serving_status = database_->GetServingStatus(service_name);
if (serving_status == NOT_FOUND) {
status = Status(StatusCode::NOT_FOUND, "service name unknown");
} else if (!service_->EncodeResponse(serving_status, &response)) {
status = Status(StatusCode::INTERNAL, "could not encode response");
}
}
// Send response.
{
grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
if (!service_->shutdown_) {
next_ =
CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
if (status.ok()) {
writer_.Finish(response, status, &next_);
} else {
writer_.FinishWithError(status, &next_);
}
}
}
}
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
service_, this);
}
self.reset(); // To appease clang-tidy.
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
//
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service) {
std::shared_ptr<CallHandler> self =
std::make_shared<WatchCallHandler>(cq, database, service);
WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
{
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request AsyncNotifyWhenDone().
handler->on_done_notified_ =
CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
std::placeholders::_1, std::placeholders::_2),
self /* copies ref */);
handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
// Request a Watch() call.
handler->next_ =
CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
&handler->stream_, cq, cq,
&handler->next_);
}
}
DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
WatchCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service)
: cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
// Server shutting down.
//
// AsyncNotifyWhenDone() needs to be called before the call starts, but the
// tag will not pop out if the call never starts (
// https://github.com/grpc/grpc/issues/10136). So we need to manually
// release the ownership of the handler in this case.
GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
return;
}
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, database_, service_);
// Parse request.
if (!service_->DecodeRequest(request_, &service_name_)) {
SendFinish(std::move(self),
Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
if (!DecodeRequest(*request, &service_name_)) {
Finish(Status(StatusCode::INTERNAL, "could not parse request"));
return;
}
// Register the call for updates to the service.
gpr_log(GPR_DEBUG,
"[HCS %p] Health watch started for service \"%s\" (handler: %p)",
"[HCS %p] Health watch started for service \"%s\" (reactor: %p)",
service_, service_name_.c_str(), this);
database_->RegisterCallHandler(service_name_, std::move(self));
service_->database_->RegisterWatch(service_name_, Ref());
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
grpc_core::MutexLock lock(&send_mu_);
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealth(ServingStatus status) {
grpc::internal::MutexLock lock(&mu_);
// If there's already a send in flight, cache the new status, and
// we'll start a new send for it when the one in flight completes.
if (send_in_flight_) {
if (write_pending_) {
pending_status_ = status;
return;
}
// Start a send.
SendHealthLocked(std::move(self), status);
SendHealthLocked(status);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
send_in_flight_ = true;
// Construct response.
ByteBuffer response;
bool success = service_->EncodeResponse(status, &response);
// Grab shutdown lock and send response.
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
SendFinishLocked(std::move(self), Status::CANCELLED);
return;
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealthLocked(ServingStatus status) {
// Check if we're shutting down.
{
grpc::internal::MutexLock lock(&service_->mu_);
if (service_->shutdown_) {
Finish(Status::CANCELLED);
return;
}
}
// Send response.
bool success = EncodeResponse(status, &response_);
if (!success) {
SendFinishLocked(std::move(self),
Status(StatusCode::INTERNAL, "could not encode response"));
Finish(Status(StatusCode::INTERNAL, "could not encode response"));
return;
}
next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Write(response, &next_);
write_pending_ = true;
StartWrite(&response_);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnWriteDone(bool ok) {
response_.Clear();
if (!ok) {
SendFinish(std::move(self), Status::CANCELLED);
Finish(Status::CANCELLED);
return;
}
grpc_core::MutexLock lock(&send_mu_);
send_in_flight_ = false;
grpc::internal::MutexLock lock(&mu_);
write_pending_ = false;
// If we got a new status since we started the last send, start a
// new send for it.
if (pending_status_ != NOT_FOUND) {
auto status = pending_status_;
pending_status_ = NOT_FOUND;
SendHealthLocked(std::move(self), status);
SendHealthLocked(status);
}
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
if (finish_called_) return;
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) return;
SendFinishLocked(std::move(self), status);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
on_finish_done_ =
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Finish(status, &on_finish_done_);
finish_called_ = true;
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnCancel() {
Finish(Status(StatusCode::UNKNOWN, "call cancelled by client"));
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_DEBUG,
"[HCS %p] Health watch call finished (service_name: \"%s\", "
"handler: %p).",
service_, service_name_.c_str(), this);
}
self.reset(); // To appease clang-tidy.
}
// TODO(roth): This method currently assumes that there will be only one
// thread polling the cq and invoking the corresponding callbacks. If
// that changes, we will need to add synchronization here.
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
GPR_ASSERT(ok);
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() {
gpr_log(GPR_DEBUG,
"[HCS %p] Health watch call is notified done (handler: %p, "
"is_cancelled: %d).",
service_, this, static_cast<int>(ctx_.IsCancelled()));
database_->UnregisterCallHandler(service_name_, self);
SendFinish(std::move(self), Status::CANCELLED);
"[HCS %p] Health watch call finished (service_name: \"%s\", "
"watcher: %p).",
service_, service_name_.c_str(), this);
service_->database_->UnregisterWatch(service_name_, this);
{
grpc::internal::MutexLock lock(&service_->mu_);
if (--service_->num_watches_ == 0) {
service_->shutdown_condition_.Signal();
}
}
// Free the initial ref from instantiation.
Unref();
}
} // namespace grpc

@ -25,14 +25,12 @@
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/server_callback_handlers.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
namespace grpc {
@ -45,191 +43,52 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
// The service impl to register with the server.
class HealthCheckServiceImpl : public Service {
public:
// Base class for call handlers.
class CallHandler {
// Reactor for handling Watch streams.
class WatchReactor : public ServerWriteReactor<ByteBuffer>,
public grpc_core::RefCounted<WatchReactor> {
public:
virtual ~CallHandler() = default;
virtual void SendHealth(std::shared_ptr<CallHandler> self,
ServingStatus status) = 0;
};
HealthCheckServiceImpl(DefaultHealthCheckService* database,
std::unique_ptr<ServerCompletionQueue> cq);
~HealthCheckServiceImpl() override;
WatchReactor(HealthCheckServiceImpl* service, const ByteBuffer* request);
void StartServingThread();
void SendHealth(ServingStatus status);
private:
// A tag that can be called with a bool argument. It's tailored for
// CallHandler's use. Before being used, it should be constructed with a
// method of CallHandler and a shared pointer to the handler. The
// shared pointer will be moved to the invoked function and the function
// can only be invoked once. That makes ref counting of the handler easier,
// because the shared pointer is not bound to the function and can be gone
// once the invoked function returns (if not used any more).
class CallableTag {
public:
using HandlerFunction =
std::function<void(std::shared_ptr<CallHandler>, bool)>;
CallableTag() {}
CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
: handler_function_(std::move(func)), handler_(std::move(handler)) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
}
// Runs the tag. This should be called only once. The handler is no
// longer owned by this tag after this method is invoked.
void Run(bool ok) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
handler_function_(std::move(handler_), ok);
}
// Releases and returns the shared pointer to the handler.
std::shared_ptr<CallHandler> ReleaseHandler() {
return std::move(handler_);
}
void OnWriteDone(bool ok) override;
void OnCancel() override;
void OnDone() override;
private:
HandlerFunction handler_function_ = nullptr;
std::shared_ptr<CallHandler> handler_;
};
void SendHealthLocked(ServingStatus status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
// Call handler for Check method.
// Each handler takes care of one call. It contains per-call data and it
// will access the members of the parent class (i.e.,
// DefaultHealthCheckService) for per-service health data.
class CheckCallHandler : public CallHandler {
public:
// Instantiates a CheckCallHandler and requests the next health check
// call. The handler object will manage its own lifetime, so no action is
// needed from the caller any more regarding that object.
static void CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
// This ctor is public because we want to use std::make_shared<> in
// CreateAndStart(). This ctor shouldn't be used elsewhere.
CheckCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
// Not used for Check.
void SendHealth(std::shared_ptr<CallHandler> /*self*/,
ServingStatus /*status*/) override {}
private:
// Called when we receive a call.
// Spawns a new handler so that we can keep servicing future calls.
void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
// Called when Finish() is done.
void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
// The members passed down from HealthCheckServiceImpl.
ServerCompletionQueue* cq_;
DefaultHealthCheckService* database_;
HealthCheckServiceImpl* service_;
std::string service_name_;
ByteBuffer response_;
ByteBuffer request_;
GenericServerAsyncResponseWriter writer_;
ServerContext ctx_;
CallableTag next_;
grpc::internal::Mutex mu_;
bool write_pending_ ABSL_GUARDED_BY(mu_) = false;
ServingStatus pending_status_ ABSL_GUARDED_BY(mu_) = NOT_FOUND;
};
// Call handler for Watch method.
// Each handler takes care of one call. It contains per-call data and it
// will access the members of the parent class (i.e.,
// DefaultHealthCheckService) for per-service health data.
class WatchCallHandler : public CallHandler {
public:
// Instantiates a WatchCallHandler and requests the next health check
// call. The handler object will manage its own lifetime, so no action is
// needed from the caller any more regarding that object.
static void CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
explicit HealthCheckServiceImpl(DefaultHealthCheckService* database);
// This ctor is public because we want to use std::make_shared<> in
// CreateAndStart(). This ctor shouldn't be used elsewhere.
WatchCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
void SendHealth(std::shared_ptr<CallHandler> self,
ServingStatus status) override;
private:
// Called when we receive a call.
// Spawns a new handler so that we can keep servicing future calls.
void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
// Requires holding send_mu_.
void SendHealthLocked(std::shared_ptr<CallHandler> self,
ServingStatus status);
// When sending a health result finishes.
void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
// Requires holding service_->cq_shutdown_mu_.
void SendFinishLocked(std::shared_ptr<CallHandler> self,
const Status& status);
// Called when Finish() is done.
void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
// Called when AsyncNotifyWhenDone() notifies us.
void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
// The members passed down from HealthCheckServiceImpl.
ServerCompletionQueue* cq_;
DefaultHealthCheckService* database_;
HealthCheckServiceImpl* service_;
ByteBuffer request_;
std::string service_name_;
GenericServerAsyncWriter stream_;
ServerContext ctx_;
grpc_core::Mutex send_mu_;
bool send_in_flight_ = false; // Guarded by mu_.
ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
bool finish_called_ = false;
CallableTag next_;
CallableTag on_done_notified_;
CallableTag on_finish_done_;
};
~HealthCheckServiceImpl() override;
// Handles the incoming requests and drives the completion queue in a loop.
static void Serve(void* arg);
private:
// Request handler for Check method.
static ServerUnaryReactor* HandleCheckRequest(
DefaultHealthCheckService* database, CallbackServerContext* context,
const ByteBuffer* request, ByteBuffer* response);
// Returns true on success.
static bool DecodeRequest(const ByteBuffer& request,
std::string* service_name);
static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
// Needed to appease Windows compilers, which don't seem to allow
// nested classes to access protected members in the parent's
// superclass.
using Service::RequestAsyncServerStreaming;
using Service::RequestAsyncUnary;
DefaultHealthCheckService* database_;
std::unique_ptr<ServerCompletionQueue> cq_;
// To synchronize the operations related to shutdown state of cq_, so that
// we don't enqueue new tags into cq_ after it is already shut down.
grpc_core::Mutex cq_shutdown_mu_;
std::atomic_bool shutdown_{false};
std::unique_ptr<grpc_core::Thread> thread_;
grpc::internal::Mutex mu_;
grpc::internal::CondVar shutdown_condition_;
bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
size_t num_watches_ ABSL_GUARDED_BY(mu_) = 0;
};
DefaultHealthCheckService();
@ -241,8 +100,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
ServingStatus GetServingStatus(const std::string& service_name) const;
HealthCheckServiceImpl* GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq);
HealthCheckServiceImpl* GetHealthCheckService();
private:
// Stores the current serving status of a service and any call
@ -251,31 +109,28 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
public:
void SetServingStatus(ServingStatus status);
ServingStatus GetServingStatus() const { return status_; }
void AddCallHandler(
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
void RemoveCallHandler(
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
bool Unused() const {
return call_handlers_.empty() && status_ == NOT_FOUND;
}
void AddWatch(
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher);
void RemoveWatch(HealthCheckServiceImpl::WatchReactor* watcher);
bool Unused() const { return watchers_.empty() && status_ == NOT_FOUND; }
private:
ServingStatus status_ = NOT_FOUND;
std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
call_handlers_;
std::map<HealthCheckServiceImpl::WatchReactor*,
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor>>
watchers_;
};
void RegisterCallHandler(
void RegisterWatch(
const std::string& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher);
void UnregisterCallHandler(
const std::string& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
void UnregisterWatch(const std::string& service_name,
HealthCheckServiceImpl::WatchReactor* watcher);
mutable grpc_core::Mutex mu_;
bool shutdown_ = false; // Guarded by mu_.
std::map<std::string, ServiceData> services_map_; // Guarded by mu_.
mutable grpc::internal::Mutex mu_;
bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
std::map<std::string, ServiceData> services_map_ ABSL_GUARDED_BY(&mu_);
std::unique_ptr<HealthCheckServiceImpl> impl_;
};

@ -1031,6 +1031,7 @@ bool Server::RegisterService(const std::string* addr, grpc::Service* service) {
has_callback_methods_ = true;
grpc::internal::RpcServiceMethod* method_value = method.get();
grpc::CompletionQueue* cq = CallbackCQ();
grpc_server_register_completion_queue(server_, cq->cq(), nullptr);
grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator(
cq->cq(), method_registration_tag, [this, cq, method_value] {
grpc_core::Server::RegisteredCallAllocation result;
@ -1122,25 +1123,12 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// Only create default health check service when user did not provide an
// explicit one.
grpc::ServerCompletionQueue* health_check_cq = nullptr;
grpc::DefaultHealthCheckService::HealthCheckServiceImpl*
default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
grpc::DefaultHealthCheckServiceEnabled()) {
auto* default_hc_service = new grpc::DefaultHealthCheckService;
health_check_service_.reset(default_hc_service);
// We create a non-polling CQ to avoid impacting application
// performance. This ensures that we don't introduce thread hops
// for application requests that wind up on this CQ, which is polled
// in its own thread.
health_check_cq = new grpc::ServerCompletionQueue(
GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr);
grpc_server_register_completion_queue(server_, health_check_cq->cq(),
nullptr);
default_health_check_service_impl =
default_hc_service->GetHealthCheckService(
std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq));
RegisterService(nullptr, default_health_check_service_impl);
auto default_hc_service = absl::make_unique<DefaultHealthCheckService>();
auto* hc_service_impl = default_hc_service->GetHealthCheckService();
health_check_service_ = std::move(default_hc_service);
RegisterService(nullptr, hc_service_impl);
}
for (auto& acceptor : acceptors_) {
@ -1187,9 +1175,6 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
if (health_check_cq != nullptr) {
new UnimplementedAsyncRequest(this, health_check_cq);
}
unknown_rpc_needed = false;
}
@ -1206,10 +1191,6 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
value->Start();
}
if (default_health_check_service_impl != nullptr) {
default_health_check_service_impl->StartServingThread();
}
for (auto& acceptor : acceptors_) {
acceptor->Start();
}

Loading…
Cancel
Save