Revert "Second attempt: health check service: rewrite using callback … (#29813)

* Revert "Second attempt: health check service: rewrite using callback API (#29562)"

This reverts commit 90ee4e85f5.

* Automated change: Fix sanity tests

* iwyu

* iwyu fix

* Automated change: Fix sanity tests

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
reviewable/pr29832/r1
Craig Tiller 3 years ago committed by GitHub
parent 59ea410d17
commit 897bc2c100
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 413
      src/cpp/server/health/default_health_check_service.cc
  2. 244
      src/cpp/server/health/default_health_check_service.h
  3. 67
      src/cpp/server/server_cc.cc

@ -18,6 +18,7 @@
#include "src/cpp/server/health/default_health_check_service.h"
#include <stddef.h>
#include <stdint.h>
#include <memory>
@ -29,7 +30,6 @@
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/server_callback_handlers.h>
#include <grpcpp/impl/rpc_method.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/support/slice.h>
@ -50,7 +50,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() {
void DefaultHealthCheckService::SetServingStatus(
const std::string& service_name, bool serving) {
grpc::internal::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
// Set to NOT_SERVING in case service_name is not in the map.
serving = false;
@ -60,8 +60,10 @@ void DefaultHealthCheckService::SetServingStatus(
void DefaultHealthCheckService::SetServingStatus(bool serving) {
const ServingStatus status = serving ? SERVING : NOT_SERVING;
grpc::internal::MutexLock lock(&mu_);
if (shutdown_) return;
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
return;
}
for (auto& p : services_map_) {
ServiceData& service_data = p.second;
service_data.SetServingStatus(status);
@ -69,8 +71,10 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) {
}
void DefaultHealthCheckService::Shutdown() {
grpc::internal::MutexLock lock(&mu_);
if (shutdown_) return;
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
return;
}
shutdown_ = true;
for (auto& p : services_map_) {
ServiceData& service_data = p.second;
@ -81,37 +85,43 @@ void DefaultHealthCheckService::Shutdown() {
DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus(
const std::string& service_name) const {
grpc::internal::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) return NOT_FOUND;
if (it == services_map_.end()) {
return NOT_FOUND;
}
const ServiceData& service_data = it->second;
return service_data.GetServingStatus();
}
void DefaultHealthCheckService::RegisterWatch(
void DefaultHealthCheckService::RegisterCallHandler(
const std::string& service_name,
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
grpc::internal::MutexLock lock(&mu_);
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
grpc_core::MutexLock lock(&mu_);
ServiceData& service_data = services_map_[service_name];
watcher->SendHealth(service_data.GetServingStatus());
service_data.AddWatch(std::move(watcher));
service_data.AddCallHandler(handler /* copies ref */);
HealthCheckServiceImpl::CallHandler* h = handler.get();
h->SendHealth(std::move(handler), service_data.GetServingStatus());
}
void DefaultHealthCheckService::UnregisterWatch(
void DefaultHealthCheckService::UnregisterCallHandler(
const std::string& service_name,
HealthCheckServiceImpl::WatchReactor* watcher) {
grpc::internal::MutexLock lock(&mu_);
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
grpc_core::MutexLock lock(&mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) return;
ServiceData& service_data = it->second;
service_data.RemoveWatch(watcher);
if (service_data.Unused()) services_map_.erase(it);
service_data.RemoveCallHandler(handler);
if (service_data.Unused()) {
services_map_.erase(it);
}
}
DefaultHealthCheckService::HealthCheckServiceImpl*
DefaultHealthCheckService::GetHealthCheckService() {
DefaultHealthCheckService::GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq) {
GPR_ASSERT(impl_ == nullptr);
impl_ = absl::make_unique<HealthCheckServiceImpl>(this);
impl_ = absl::make_unique<HealthCheckServiceImpl>(this, std::move(cq));
return impl_.get();
}
@ -122,19 +132,19 @@ DefaultHealthCheckService::GetHealthCheckService() {
void DefaultHealthCheckService::ServiceData::SetServingStatus(
ServingStatus status) {
status_ = status;
for (const auto& p : watchers_) {
p.first->SendHealth(status);
for (auto& call_handler : call_handlers_) {
call_handler->SendHealth(call_handler /* copies ref */, status);
}
}
void DefaultHealthCheckService::ServiceData::AddWatch(
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
watchers_[watcher.get()] = std::move(watcher);
void DefaultHealthCheckService::ServiceData::AddCallHandler(
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
call_handlers_.insert(std::move(handler));
}
void DefaultHealthCheckService::ServiceData::RemoveWatch(
HealthCheckServiceImpl::WatchReactor* watcher) {
watchers_.erase(watcher);
void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
call_handlers_.erase(handler);
}
//
@ -147,57 +157,53 @@ const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* database)
: database_(database) {
DefaultHealthCheckService* database,
std::unique_ptr<ServerCompletionQueue> cq)
: database_(database), cq_(std::move(cq)) {
// Add Check() method.
AddMethod(new internal::RpcServiceMethod(
kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
MarkMethodCallback(
0, new internal::CallbackUnaryHandler<ByteBuffer, ByteBuffer>(
[database](CallbackServerContext* context,
const ByteBuffer* request, ByteBuffer* response) {
return HandleCheckRequest(database, context, request, response);
}));
// Add Watch() method.
AddMethod(new internal::RpcServiceMethod(
kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
MarkMethodCallback(
1, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
[this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) {
return new WatchReactor(this, request);
}));
// Create serving thread.
thread_ = absl::make_unique<grpc_core::Thread>("grpc_health_check_service",
Serve, this);
}
DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
grpc::internal::MutexLock lock(&mu_);
// We will reach here after the server starts shutting down.
shutdown_ = true;
while (num_watches_ > 0) {
shutdown_condition_.Wait(&mu_);
{
grpc_core::MutexLock lock(&cq_shutdown_mu_);
cq_->Shutdown();
}
thread_->Join();
}
ServerUnaryReactor*
DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest(
DefaultHealthCheckService* database, CallbackServerContext* context,
const ByteBuffer* request, ByteBuffer* response) {
auto* reactor = context->DefaultReactor();
std::string service_name;
if (!DecodeRequest(*request, &service_name)) {
reactor->Finish(
Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
return reactor;
}
ServingStatus serving_status = database->GetServingStatus(service_name);
if (serving_status == NOT_FOUND) {
reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown"));
return reactor;
}
if (!EncodeResponse(serving_status, response)) {
reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response"));
return reactor;
void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
// Request the calls we're interested in.
// We do this before starting the serving thread, so that we know it's
// done before server startup is complete.
CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
// Start serving thread.
thread_->Start();
}
void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg);
void* tag;
bool ok;
while (true) {
if (!service->cq_->Next(&tag, &ok)) {
// The completion queue is shutting down.
GPR_ASSERT(service->shutdown_);
break;
}
auto* next_step = static_cast<CallableTag*>(tag);
next_step->Run(ok);
}
reactor->Finish(Status::OK);
return reactor;
}
bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
@ -248,124 +254,245 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor
// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
//
DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor(
HealthCheckServiceImpl* service, const ByteBuffer* request)
: service_(service) {
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service) {
std::shared_ptr<CallHandler> self =
std::make_shared<CheckCallHandler>(cq, database, service);
CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
{
grpc::internal::MutexLock lock(&service_->mu_);
++service_->num_watches_;
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request a Check() call.
handler->next_ =
CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
&handler->writer_, cq, cq, &handler->next_);
}
bool success = DecodeRequest(*request, &service_name_);
gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": watch call started", service_,
this, service_name_.c_str());
if (!success) {
MaybeFinishLocked(Status(StatusCode::INTERNAL, "could not parse request"));
}
DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CheckCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service)
: cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
// The value of ok being false means that the server is shutting down.
return;
}
// Register the call for updates to the service.
service_->database_->RegisterWatch(service_name_, Ref());
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, database_, service_);
// Process request.
gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
this);
std::string service_name;
grpc::Status status = Status::OK;
ByteBuffer response;
if (!service_->DecodeRequest(request_, &service_name)) {
status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
} else {
ServingStatus serving_status = database_->GetServingStatus(service_name);
if (serving_status == NOT_FOUND) {
status = Status(StatusCode::NOT_FOUND, "service name unknown");
} else if (!service_->EncodeResponse(serving_status, &response)) {
status = Status(StatusCode::INTERNAL, "could not encode response");
}
}
// Send response.
{
grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
if (!service_->shutdown_) {
next_ =
CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
if (status.ok()) {
writer_.Finish(response, status, &next_);
} else {
writer_.FinishWithError(status, &next_);
}
}
}
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealth(ServingStatus status) {
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
service_, this);
}
self.reset(); // To appease clang-tidy.
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
//
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service) {
std::shared_ptr<CallHandler> self =
std::make_shared<WatchCallHandler>(cq, database, service);
WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
{
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request AsyncNotifyWhenDone().
handler->on_done_notified_ =
CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
std::placeholders::_1, std::placeholders::_2),
self /* copies ref */);
handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
// Request a Watch() call.
handler->next_ =
CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
&handler->stream_, cq, cq,
&handler->next_);
}
}
DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
WatchCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service)
: cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
// Server shutting down.
//
// AsyncNotifyWhenDone() needs to be called before the call starts, but the
// tag will not pop out if the call never starts (
// https://github.com/grpc/grpc/issues/10136). So we need to manually
// release the ownership of the handler in this case.
GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
return;
}
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, database_, service_);
// Parse request.
if (!service_->DecodeRequest(request_, &service_name_)) {
SendFinish(std::move(self),
Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
return;
}
// Register the call for updates to the service.
gpr_log(GPR_DEBUG,
"[HCS %p] watcher %p \"%s\": SendHealth() for ServingStatus %d",
service_, this, service_name_.c_str(), status);
grpc::internal::MutexLock lock(&mu_);
"[HCS %p] Health watch started for service \"%s\" (handler: %p)",
service_, service_name_.c_str(), this);
database_->RegisterCallHandler(service_name_, std::move(self));
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
grpc_core::MutexLock lock(&send_mu_);
// If there's already a send in flight, cache the new status, and
// we'll start a new send for it when the one in flight completes.
if (write_pending_) {
gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": queuing write", service_,
this, service_name_.c_str());
if (send_in_flight_) {
pending_status_ = status;
return;
}
// Start a send.
SendHealthLocked(status);
SendHealthLocked(std::move(self), status);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealthLocked(ServingStatus status) {
// Do nothing if Finish() has already been called.
if (finish_called_) return;
// Check if we're shutting down.
{
grpc::internal::MutexLock lock(&service_->mu_);
if (service_->shutdown_) {
MaybeFinishLocked(
Status(StatusCode::CANCELLED, "not writing due to shutdown"));
return;
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
send_in_flight_ = true;
// Construct response.
ByteBuffer response;
bool success = service_->EncodeResponse(status, &response);
// Grab shutdown lock and send response.
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
SendFinishLocked(std::move(self), Status::CANCELLED);
return;
}
// Send response.
bool success = EncodeResponse(status, &response_);
if (!success) {
MaybeFinishLocked(
Status(StatusCode::INTERNAL, "could not encode response"));
SendFinishLocked(std::move(self),
Status(StatusCode::INTERNAL, "could not encode response"));
return;
}
gpr_log(GPR_DEBUG,
"[HCS %p] watcher %p \"%s\": starting write for ServingStatus %d",
service_, this, service_name_.c_str(), status);
write_pending_ = true;
StartWrite(&response_);
next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Write(response, &next_);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnWriteDone(bool ok) {
gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnWriteDone(): ok=%d",
service_, this, service_name_.c_str(), ok);
response_.Clear();
grpc::internal::MutexLock lock(&mu_);
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
MaybeFinishLocked(Status(StatusCode::CANCELLED, "OnWriteDone() ok=false"));
SendFinish(std::move(self), Status::CANCELLED);
return;
}
write_pending_ = false;
grpc_core::MutexLock lock(&send_mu_);
send_in_flight_ = false;
// If we got a new status since we started the last send, start a
// new send for it.
if (pending_status_ != NOT_FOUND) {
auto status = pending_status_;
pending_status_ = NOT_FOUND;
SendHealthLocked(status);
SendHealthLocked(std::move(self), status);
}
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnCancel() {
grpc::internal::MutexLock lock(&mu_);
MaybeFinishLocked(Status(StatusCode::UNKNOWN, "OnCancel()"));
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
if (finish_called_) return;
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) return;
SendFinishLocked(std::move(self), status);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() {
gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": OnDone()", service_, this,
service_name_.c_str());
service_->database_->UnregisterWatch(service_name_, this);
{
grpc::internal::MutexLock lock(&service_->mu_);
if (--service_->num_watches_ == 0 && service_->shutdown_) {
service_->shutdown_condition_.Signal();
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
on_finish_done_ =
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Finish(status, &on_finish_done_);
finish_called_ = true;
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_DEBUG,
"[HCS %p] Health watch call finished (service_name: \"%s\", "
"handler: %p).",
service_, service_name_.c_str(), this);
}
// Free the initial ref from instantiation.
Unref();
self.reset(); // To appease clang-tidy.
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
MaybeFinishLocked(Status status) {
// TODO(roth): This method currently assumes that there will be only one
// thread polling the cq and invoking the corresponding callbacks. If
// that changes, we will need to add synchronization here.
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
GPR_ASSERT(ok);
gpr_log(GPR_DEBUG,
"[HCS %p] watcher %p \"%s\": MaybeFinishLocked() with code=%d msg=%s",
service_, this, service_name_.c_str(), status.error_code(),
status.error_message().c_str());
if (!finish_called_) {
gpr_log(GPR_DEBUG, "[HCS %p] watcher %p \"%s\": actually calling Finish()",
service_, this, service_name_.c_str());
finish_called_ = true;
Finish(status);
}
"[HCS %p] Health watch call is notified done (handler: %p, "
"is_cancelled: %d).",
service_, this, static_cast<int>(ctx_.IsCancelled()));
database_->UnregisterCallHandler(service_name_, self);
SendFinish(std::move(self), Status::CANCELLED);
}
} // namespace grpc

@ -19,25 +19,27 @@
#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
#include <stddef.h>
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include "absl/base/thread_annotations.h"
#include <grpc/support/log.h>
#include <grpcpp/generic/async_generic_service.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/impl/codegen/async_generic_service.h> // IWYU pragma: keep
#include <grpcpp/impl/codegen/async_unary_call.h> // IWYU pragma: keep
#include <grpcpp/impl/service_type.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpcpp/support/config.h>
#include <grpcpp/support/server_callback.h>
#include <grpcpp/support/status.h>
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
namespace grpc {
@ -50,55 +52,191 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
// The service impl to register with the server.
class HealthCheckServiceImpl : public Service {
public:
// Reactor for handling Watch streams.
class WatchReactor : public ServerWriteReactor<ByteBuffer>,
public grpc_core::RefCounted<WatchReactor> {
// Base class for call handlers.
class CallHandler {
public:
WatchReactor(HealthCheckServiceImpl* service, const ByteBuffer* request);
virtual ~CallHandler() = default;
virtual void SendHealth(std::shared_ptr<CallHandler> self,
ServingStatus status) = 0;
};
void SendHealth(ServingStatus status);
HealthCheckServiceImpl(DefaultHealthCheckService* database,
std::unique_ptr<ServerCompletionQueue> cq);
~HealthCheckServiceImpl() override;
void OnWriteDone(bool ok) override;
void OnCancel() override;
void OnDone() override;
void StartServingThread();
private:
// A tag that can be called with a bool argument. It's tailored for
// CallHandler's use. Before being used, it should be constructed with a
// method of CallHandler and a shared pointer to the handler. The
// shared pointer will be moved to the invoked function and the function
// can only be invoked once. That makes ref counting of the handler easier,
// because the shared pointer is not bound to the function and can be gone
// once the invoked function returns (if not used any more).
class CallableTag {
public:
using HandlerFunction =
std::function<void(std::shared_ptr<CallHandler>, bool)>;
CallableTag() {}
CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
: handler_function_(std::move(func)), handler_(std::move(handler)) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
}
// Runs the tag. This should be called only once. The handler is no
// longer owned by this tag after this method is invoked.
void Run(bool ok) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
handler_function_(std::move(handler_), ok);
}
// Releases and returns the shared pointer to the handler.
std::shared_ptr<CallHandler> ReleaseHandler() {
return std::move(handler_);
}
private:
void SendHealthLocked(ServingStatus status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
HandlerFunction handler_function_ = nullptr;
std::shared_ptr<CallHandler> handler_;
};
void MaybeFinishLocked(Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
// Call handler for Check method.
// Each handler takes care of one call. It contains per-call data and it
// will access the members of the parent class (i.e.,
// DefaultHealthCheckService) for per-service health data.
class CheckCallHandler : public CallHandler {
public:
// Instantiates a CheckCallHandler and requests the next health check
// call. The handler object will manage its own lifetime, so no action is
// needed from the caller any more regarding that object.
static void CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
// This ctor is public because we want to use std::make_shared<> in
// CreateAndStart(). This ctor shouldn't be used elsewhere.
CheckCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
// Not used for Check.
void SendHealth(std::shared_ptr<CallHandler> /*self*/,
ServingStatus /*status*/) override {}
private:
// Called when we receive a call.
// Spawns a new handler so that we can keep servicing future calls.
void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
// Called when Finish() is done.
void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
// The members passed down from HealthCheckServiceImpl.
ServerCompletionQueue* cq_;
DefaultHealthCheckService* database_;
HealthCheckServiceImpl* service_;
std::string service_name_;
ByteBuffer response_;
grpc::internal::Mutex mu_;
bool write_pending_ ABSL_GUARDED_BY(mu_) = false;
ServingStatus pending_status_ ABSL_GUARDED_BY(mu_) = NOT_FOUND;
bool finish_called_ ABSL_GUARDED_BY(mu_) = false;
ByteBuffer request_;
GenericServerAsyncResponseWriter writer_;
ServerContext ctx_;
CallableTag next_;
};
explicit HealthCheckServiceImpl(DefaultHealthCheckService* database);
// Call handler for Watch method.
// Each handler takes care of one call. It contains per-call data and it
// will access the members of the parent class (i.e.,
// DefaultHealthCheckService) for per-service health data.
class WatchCallHandler : public CallHandler {
public:
// Instantiates a WatchCallHandler and requests the next health check
// call. The handler object will manage its own lifetime, so no action is
// needed from the caller any more regarding that object.
static void CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
// This ctor is public because we want to use std::make_shared<> in
// CreateAndStart(). This ctor shouldn't be used elsewhere.
WatchCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
void SendHealth(std::shared_ptr<CallHandler> self,
ServingStatus status) override;
~HealthCheckServiceImpl() override;
private:
// Called when we receive a call.
// Spawns a new handler so that we can keep servicing future calls.
void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
private:
// Request handler for Check method.
static ServerUnaryReactor* HandleCheckRequest(
DefaultHealthCheckService* database, CallbackServerContext* context,
const ByteBuffer* request, ByteBuffer* response);
// Requires holding send_mu_.
void SendHealthLocked(std::shared_ptr<CallHandler> self,
ServingStatus status);
// When sending a health result finishes.
void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
// Requires holding service_->cq_shutdown_mu_.
void SendFinishLocked(std::shared_ptr<CallHandler> self,
const Status& status);
// Called when Finish() is done.
void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
// Called when AsyncNotifyWhenDone() notifies us.
void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
// The members passed down from HealthCheckServiceImpl.
ServerCompletionQueue* cq_;
DefaultHealthCheckService* database_;
HealthCheckServiceImpl* service_;
ByteBuffer request_;
std::string service_name_;
GenericServerAsyncWriter stream_;
ServerContext ctx_;
grpc_core::Mutex send_mu_;
bool send_in_flight_ = false; // Guarded by mu_.
ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
bool finish_called_ = false;
CallableTag next_;
CallableTag on_done_notified_;
CallableTag on_finish_done_;
};
// Handles the incoming requests and drives the completion queue in a loop.
static void Serve(void* arg);
// Returns true on success.
static bool DecodeRequest(const ByteBuffer& request,
std::string* service_name);
static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
// Needed to appease Windows compilers, which don't seem to allow
// nested classes to access protected members in the parent's
// superclass.
using Service::RequestAsyncServerStreaming;
using Service::RequestAsyncUnary;
DefaultHealthCheckService* database_;
std::unique_ptr<ServerCompletionQueue> cq_;
grpc::internal::Mutex mu_;
grpc::internal::CondVar shutdown_condition_;
bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
size_t num_watches_ ABSL_GUARDED_BY(mu_) = 0;
// To synchronize the operations related to shutdown state of cq_, so that
// we don't enqueue new tags into cq_ after it is already shut down.
grpc_core::Mutex cq_shutdown_mu_;
std::atomic_bool shutdown_{false};
std::unique_ptr<grpc_core::Thread> thread_;
};
DefaultHealthCheckService();
@ -110,7 +248,8 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
ServingStatus GetServingStatus(const std::string& service_name) const;
HealthCheckServiceImpl* GetHealthCheckService();
HealthCheckServiceImpl* GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq);
private:
// Stores the current serving status of a service and any call
@ -119,28 +258,31 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
public:
void SetServingStatus(ServingStatus status);
ServingStatus GetServingStatus() const { return status_; }
void AddWatch(
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher);
void RemoveWatch(HealthCheckServiceImpl::WatchReactor* watcher);
bool Unused() const { return watchers_.empty() && status_ == NOT_FOUND; }
void AddCallHandler(
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
void RemoveCallHandler(
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
bool Unused() const {
return call_handlers_.empty() && status_ == NOT_FOUND;
}
private:
ServingStatus status_ = NOT_FOUND;
std::map<HealthCheckServiceImpl::WatchReactor*,
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor>>
watchers_;
std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
call_handlers_;
};
void RegisterWatch(
void RegisterCallHandler(
const std::string& service_name,
grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher);
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
void UnregisterWatch(const std::string& service_name,
HealthCheckServiceImpl::WatchReactor* watcher);
void UnregisterCallHandler(
const std::string& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
mutable grpc::internal::Mutex mu_;
bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
std::map<std::string, ServiceData> services_map_ ABSL_GUARDED_BY(&mu_);
mutable grpc_core::Mutex mu_;
bool shutdown_ = false; // Guarded by mu_.
std::map<std::string, ServiceData> services_map_; // Guarded by mu_.
std::unique_ptr<HealthCheckServiceImpl> impl_;
};

@ -1058,7 +1058,6 @@ bool Server::RegisterService(const std::string* addr, grpc::Service* service) {
has_callback_methods_ = true;
grpc::internal::RpcServiceMethod* method_value = method.get();
grpc::CompletionQueue* cq = CallbackCQ();
grpc_server_register_completion_queue(server_, cq->cq(), nullptr);
grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator(
cq->cq(), method_registration_tag, [this, cq, method_value] {
grpc_core::Server::RegisteredCallAllocation result;
@ -1150,45 +1149,58 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
// Only create default health check service when user did not provide an
// explicit one.
grpc::ServerCompletionQueue* health_check_cq = nullptr;
grpc::DefaultHealthCheckService::HealthCheckServiceImpl*
default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
grpc::DefaultHealthCheckServiceEnabled()) {
auto default_hc_service = absl::make_unique<DefaultHealthCheckService>();
auto* hc_service_impl = default_hc_service->GetHealthCheckService();
health_check_service_ = std::move(default_hc_service);
RegisterService(nullptr, hc_service_impl);
auto* default_hc_service = new grpc::DefaultHealthCheckService;
health_check_service_.reset(default_hc_service);
// We create a non-polling CQ to avoid impacting application
// performance. This ensures that we don't introduce thread hops
// for application requests that wind up on this CQ, which is polled
// in its own thread.
health_check_cq = new grpc::ServerCompletionQueue(
GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr);
grpc_server_register_completion_queue(server_, health_check_cq->cq(),
nullptr);
default_health_check_service_impl =
default_hc_service->GetHealthCheckService(
std::unique_ptr<grpc::ServerCompletionQueue>(health_check_cq));
RegisterService(nullptr, default_health_check_service_impl);
}
for (auto& acceptor : acceptors_) {
acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_);
}
// If this server uses callback methods, then create a callback generic
// service to handle any unimplemented methods using the default reactor
// creator
if (has_callback_methods_ && !has_callback_generic_service_) {
unimplemented_service_ = absl::make_unique<grpc::CallbackGenericService>();
RegisterCallbackGenericService(unimplemented_service_.get());
}
#ifndef NDEBUG
for (size_t i = 0; i < num_cqs; i++) {
cq_list_.push_back(cqs[i]);
}
#endif
// We must have exactly one generic service to handle requests for
// unmatched method names (i.e., to return UNIMPLEMENTED for any RPC
// method for which we don't have a registered implementation). This
// service comes from one of the following places (first match wins):
// - If the application supplied a generic service via either the async
// or callback APIs, we use that.
// - If there are callback methods, register a callback generic service.
// - If there are sync methods, register a sync generic service.
// (This must be done before server start to initialize an
// AllocatingRequestMatcher.)
// - Otherwise (we have only async methods), we wait until the server
// is started and then start an UnimplementedAsyncRequest on each
// async CQ, so that the requests will be moved along by polling
// done in application threads.
// If we have a generic service, all unmatched method names go there.
// Otherwise, we must provide at least one RPC request for an "unimplemented"
// RPC, which covers any RPC for a method name that isn't matched. If we
// have a sync service, let it be a sync unimplemented RPC, which must be
// registered before server start (to initialize an AllocatingRequestMatcher).
// If we have an AllocatingRequestMatcher, we can't also specify other
// unimplemented RPCs via explicit async requests, so we won't do so. If we
// only have async services, we can specify unimplemented RPCs on each async
// CQ so that some user polling thread will move them along as long as some
// progress is being made on any RPCs in the system.
bool unknown_rpc_needed =
!has_async_generic_service_ && !has_callback_generic_service_;
if (unknown_rpc_needed && has_callback_methods_) {
unimplemented_service_ = absl::make_unique<grpc::CallbackGenericService>();
RegisterCallbackGenericService(unimplemented_service_.get());
unknown_rpc_needed = false;
}
if (unknown_rpc_needed && !sync_req_mgrs_.empty()) {
sync_req_mgrs_[0]->AddUnknownSyncMethod();
unknown_rpc_needed = false;
@ -1202,6 +1214,9 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
if (health_check_cq != nullptr) {
new UnimplementedAsyncRequest(this, health_check_cq);
}
unknown_rpc_needed = false;
}
@ -1218,6 +1233,10 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
value->Start();
}
if (default_health_check_service_impl != nullptr) {
default_health_check_service_impl->StartServingThread();
}
for (auto& acceptor : acceptors_) {
acceptor->Start();
}

Loading…
Cancel
Save