Merge pull request #16574 from markdroth/health_checking_service

Second attempt: Implement Watch method in health check service.
pull/16581/head
Mark D. Roth 6 years ago committed by GitHub
commit a8247c833c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      include/grpcpp/impl/codegen/completion_queue.h
  2. 484
      src/cpp/server/health/default_health_check_service.cc
  3. 242
      src/cpp/server/health/default_health_check_service.h
  4. 1
      src/cpp/server/health/health.pb.c
  5. 7
      src/cpp/server/health/health.pb.h
  6. 27
      src/cpp/server/server_cc.cc
  7. 20
      src/proto/grpc/health/v1/health.proto
  8. 76
      test/cpp/end2end/health_service_end2end_test.cc
  9. 18
      tools/distrib/check_nanopb_output.sh

@ -384,6 +384,7 @@ class ServerCompletionQueue : public CompletionQueue {
grpc_cq_polling_type polling_type_;
friend class ServerBuilder;
friend class Server;
};
} // namespace grpc

@ -30,29 +30,162 @@
#include "src/cpp/server/health/health.pb.h"
namespace grpc {
//
// DefaultHealthCheckService
//
DefaultHealthCheckService::DefaultHealthCheckService() {
services_map_[""].SetServingStatus(SERVING);
}
void DefaultHealthCheckService::SetServingStatus(
const grpc::string& service_name, bool serving) {
std::unique_lock<std::mutex> lock(mu_);
services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
}
void DefaultHealthCheckService::SetServingStatus(bool serving) {
const ServingStatus status = serving ? SERVING : NOT_SERVING;
std::unique_lock<std::mutex> lock(mu_);
for (auto& p : services_map_) {
ServiceData& service_data = p.second;
service_data.SetServingStatus(status);
}
}
DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus(
const grpc::string& service_name) const {
std::lock_guard<std::mutex> lock(mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) {
return NOT_FOUND;
}
const ServiceData& service_data = it->second;
return service_data.GetServingStatus();
}
void DefaultHealthCheckService::RegisterCallHandler(
const grpc::string& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
std::unique_lock<std::mutex> lock(mu_);
ServiceData& service_data = services_map_[service_name];
service_data.AddCallHandler(handler /* copies ref */);
handler->SendHealth(std::move(handler), service_data.GetServingStatus());
}
void DefaultHealthCheckService::UnregisterCallHandler(
const grpc::string& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
std::unique_lock<std::mutex> lock(mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) return;
ServiceData& service_data = it->second;
service_data.RemoveCallHandler(std::move(handler));
if (service_data.Unused()) {
services_map_.erase(it);
}
}
DefaultHealthCheckService::HealthCheckServiceImpl*
DefaultHealthCheckService::GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq) {
GPR_ASSERT(impl_ == nullptr);
impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
return impl_.get();
}
//
// DefaultHealthCheckService::ServiceData
//
void DefaultHealthCheckService::ServiceData::SetServingStatus(
ServingStatus status) {
status_ = status;
for (auto& call_handler : call_handlers_) {
call_handler->SendHealth(call_handler /* copies ref */, status);
}
}
void DefaultHealthCheckService::ServiceData::AddCallHandler(
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
call_handlers_.insert(std::move(handler));
}
void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
call_handlers_.erase(std::move(handler));
}
//
// DefaultHealthCheckService::HealthCheckServiceImpl
//
namespace {
const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* service)
: service_(service), method_(nullptr) {
internal::MethodHandler* handler =
new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
ByteBuffer>(
std::mem_fn(&HealthCheckServiceImpl::Check), this);
method_ = new internal::RpcServiceMethod(
kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
AddMethod(method_);
}
Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
// Decode request.
std::vector<Slice> slices;
if (!request->Dump(&slices).ok()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
DefaultHealthCheckService* database,
std::unique_ptr<ServerCompletionQueue> cq)
: database_(database), cq_(std::move(cq)) {
// Add Check() method.
check_method_ = new internal::RpcServiceMethod(
kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr);
AddMethod(check_method_);
// Add Watch() method.
watch_method_ = new internal::RpcServiceMethod(
kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr);
AddMethod(watch_method_);
// Create serving thread.
thread_ = std::unique_ptr<::grpc_core::Thread>(
new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
}
DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
// We will reach here after the server starts shutting down.
shutdown_ = true;
{
std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
cq_->Shutdown();
}
thread_->Join();
}
void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
thread_->Start();
}
void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
HealthCheckServiceImpl* service =
reinterpret_cast<HealthCheckServiceImpl*>(arg);
// TODO(juanlishen): This is a workaround to wait for the cq to be ready.
// Need to figure out why cq is not ready after service starts.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(1, GPR_TIMESPAN)));
CheckCallHandler::CreateAndStart(service->cq_.get(), service->database_,
service);
WatchCallHandler::CreateAndStart(service->cq_.get(), service->database_,
service);
void* tag;
bool ok;
while (true) {
if (!service->cq_->Next(&tag, &ok)) {
// The completion queue is shutting down.
GPR_ASSERT(service->shutdown_);
break;
}
auto* next_step = static_cast<CallableTag*>(tag);
next_step->Run(ok);
}
}
bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
const ByteBuffer& request, grpc::string* service_name) {
std::vector<Slice> slices;
if (!request.Dump(&slices).ok()) return false;
uint8_t* request_bytes = nullptr;
bool request_bytes_owned = false;
size_t request_size = 0;
@ -64,14 +197,13 @@ Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
request_size = slices[0].size();
} else {
request_bytes_owned = true;
request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length()));
request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
uint8_t* copy_to = request_bytes;
for (size_t i = 0; i < slices.size(); i++) {
memcpy(copy_to, slices[i].begin(), slices[i].size());
copy_to += slices[i].size();
}
}
if (request_bytes != nullptr) {
pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
bool decode_status = pb_decode(
@ -79,26 +211,22 @@ Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
if (request_bytes_owned) {
gpr_free(request_bytes);
}
if (!decode_status) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
}
// Check status from the associated default health checking service.
DefaultHealthCheckService::ServingStatus serving_status =
service_->GetServingStatus(
request_struct.has_service ? request_struct.service : "");
if (serving_status == DefaultHealthCheckService::NOT_FOUND) {
return Status(StatusCode::NOT_FOUND, "");
if (!decode_status) return false;
}
*service_name = request_struct.has_service ? request_struct.service : "";
return true;
}
// Encode response
bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
ServingStatus status, ByteBuffer* response) {
grpc_health_v1_HealthCheckResponse response_struct;
response_struct.has_status = true;
response_struct.status =
serving_status == DefaultHealthCheckService::SERVING
? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
: grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
status == NOT_FOUND
? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
: status == SERVING
? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
: grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
pb_ostream_t ostream;
memset(&ostream, 0, sizeof(ostream));
pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
@ -108,48 +236,282 @@ Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
GRPC_SLICE_LENGTH(response_slice));
bool encode_status = pb_encode(
&ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
if (!encode_status) {
return Status(StatusCode::INTERNAL, "Failed to encode response.");
}
if (!encode_status) return false;
Slice encoded_response(response_slice, Slice::STEAL_REF);
ByteBuffer response_buffer(&encoded_response, 1);
response->Swap(&response_buffer);
return Status::OK;
return true;
}
DefaultHealthCheckService::DefaultHealthCheckService() {
services_map_.emplace("", true);
//
// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
//
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service) {
std::shared_ptr<CallHandler> self =
std::make_shared<CheckCallHandler>(cq, database, service);
CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
{
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request a Check() call.
handler->next_ =
CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
&handler->writer_, cq, cq, &handler->next_);
}
}
void DefaultHealthCheckService::SetServingStatus(
const grpc::string& service_name, bool serving) {
std::lock_guard<std::mutex> lock(mu_);
services_map_[service_name] = serving;
DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CheckCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service)
: cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok) {
// The value of ok being false means that the server is shutting down.
return;
}
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, database_, service_);
// Process request.
gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
this);
grpc::string service_name;
grpc::Status status = Status::OK;
ByteBuffer response;
if (!service_->DecodeRequest(request_, &service_name)) {
status = Status(StatusCode::INVALID_ARGUMENT, "");
} else {
ServingStatus serving_status = database_->GetServingStatus(service_name);
if (serving_status == NOT_FOUND) {
status = Status(StatusCode::NOT_FOUND, "service name unknown");
} else if (!service_->EncodeResponse(serving_status, &response)) {
status = Status(StatusCode::INTERNAL, "");
}
}
// Send response.
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
if (!service_->shutdown_) {
next_ =
CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
if (status.ok()) {
writer_.Finish(response, status, &next_);
} else {
writer_.FinishWithError(status, &next_);
}
}
}
}
void DefaultHealthCheckService::SetServingStatus(bool serving) {
std::lock_guard<std::mutex> lock(mu_);
for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) {
iter->second = serving;
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
service_, this);
}
}
DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus(
const grpc::string& service_name) const {
std::lock_guard<std::mutex> lock(mu_);
const auto& iter = services_map_.find(service_name);
if (iter == services_map_.end()) {
return NOT_FOUND;
//
// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
//
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service) {
std::shared_ptr<CallHandler> self =
std::make_shared<WatchCallHandler>(cq, database, service);
WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
{
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request AsyncNotifyWhenDone().
handler->on_done_notified_ =
CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
std::placeholders::_1, std::placeholders::_2),
self /* copies ref */);
handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
// Request a Watch() call.
handler->next_ =
CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
&handler->stream_, cq, cq,
&handler->next_);
}
return iter->second ? SERVING : NOT_SERVING;
}
DefaultHealthCheckService::HealthCheckServiceImpl*
DefaultHealthCheckService::GetHealthCheckService() {
GPR_ASSERT(impl_ == nullptr);
impl_.reset(new HealthCheckServiceImpl(this));
return impl_.get();
DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
WatchCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service)
: cq_(cq),
database_(database),
service_(service),
stream_(&ctx_),
call_state_(WAITING_FOR_CALL) {}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
call_state_ = CALL_RECEIVED;
} else {
// AsyncNotifyWhenDone() needs to be called before the call starts, but the
// tag will not pop out if the call never starts (
// https://github.com/grpc/grpc/issues/10136). So we need to manually
// release the ownership of the handler in this case.
GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
}
if (!ok || shutdown_) {
// The value of ok being false means that the server is shutting down.
Shutdown(std::move(self), "OnCallReceived");
return;
}
// Spawn a new handler instance to serve the next new client. Every handler
// instance will deallocate itself when it's done.
CreateAndStart(cq_, database_, service_);
// Parse request.
if (!service_->DecodeRequest(request_, &service_name_)) {
on_finish_done_ =
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Finish(Status(StatusCode::INVALID_ARGUMENT, ""), &on_finish_done_);
call_state_ = FINISH_CALLED;
return;
}
// Register the call for updates to the service.
gpr_log(GPR_DEBUG,
"[HCS %p] Health check watch started for service \"%s\" "
"(handler: %p)",
service_, service_name_.c_str(), this);
database_->RegisterCallHandler(service_name_, std::move(self));
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
std::unique_lock<std::mutex> lock(mu_);
// If there's already a send in flight, cache the new status, and
// we'll start a new send for it when the one in flight completes.
if (send_in_flight_) {
pending_status_ = status;
return;
}
// Start a send.
SendHealthLocked(std::move(self), status);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
if (service_->shutdown_) {
cq_lock.release()->unlock();
Shutdown(std::move(self), "SendHealthLocked");
return;
}
send_in_flight_ = true;
call_state_ = SEND_MESSAGE_PENDING;
// Construct response.
ByteBuffer response;
if (!service_->EncodeResponse(status, &response)) {
on_finish_done_ =
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Finish(Status(StatusCode::INTERNAL, ""), &on_finish_done_);
return;
}
next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
stream_.Write(response, &next_);
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
if (!ok || shutdown_) {
Shutdown(std::move(self), "OnSendHealthDone");
return;
}
call_state_ = CALL_RECEIVED;
{
std::unique_lock<std::mutex> lock(mu_);
send_in_flight_ = false;
// If we got a new status since we started the last send, start a
// new send for it.
if (pending_status_ != NOT_FOUND) {
auto status = pending_status_;
pending_status_ = NOT_FOUND;
SendHealthLocked(std::move(self), status);
}
}
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
GPR_ASSERT(ok);
done_notified_ = true;
if (ctx_.IsCancelled()) {
is_cancelled_ = true;
}
gpr_log(GPR_DEBUG,
"[HCS %p] Healt check call is notified done (handler: %p, "
"is_cancelled: %d).",
service_, this, static_cast<int>(is_cancelled_));
Shutdown(std::move(self), "OnDoneNotified");
}
// TODO(roth): This method currently assumes that there will be only one
// thread polling the cq and invoking the corresponding callbacks. If
// that changes, we will need to add synchronization here.
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
Shutdown(std::shared_ptr<CallHandler> self, const char* reason) {
if (!shutdown_) {
gpr_log(GPR_DEBUG,
"[HCS %p] Shutting down the handler (service_name: \"%s\", "
"handler: %p, reason: %s).",
service_, service_name_.c_str(), this, reason);
shutdown_ = true;
}
// OnCallReceived() may be called after OnDoneNotified(), so we need to
// try to Finish() every time we are in Shutdown().
if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) {
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
if (!service_->shutdown_) {
on_finish_done_ =
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
std::placeholders::_1, std::placeholders::_2),
std::move(self));
// TODO(juanlishen): Maybe add a message proto for the client to
// explicitly cancel the stream so that we can return OK status in such
// cases.
stream_.Finish(Status::CANCELLED, &on_finish_done_);
call_state_ = FINISH_CALLED;
}
}
}
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
if (ok) {
gpr_log(GPR_DEBUG,
"[HCS %p] Health check call finished (service_name: \"%s\", "
"handler: %p).",
service_, service_name_.c_str(), this);
}
}
} // namespace grpc

@ -19,42 +19,268 @@
#ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
#define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
#include <atomic>
#include <mutex>
#include <set>
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/codegen/async_generic_service.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h>
#include "src/core/lib/gprpp/thd.h"
namespace grpc {
// Default implementation of HealthCheckServiceInterface. Server will create and
// own it.
class DefaultHealthCheckService final : public HealthCheckServiceInterface {
public:
enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
// The service impl to register with the server.
class HealthCheckServiceImpl : public Service {
public:
explicit HealthCheckServiceImpl(DefaultHealthCheckService* service);
// Base class for call handlers.
class CallHandler {
public:
virtual ~CallHandler() = default;
virtual void SendHealth(std::shared_ptr<CallHandler> self,
ServingStatus status) = 0;
};
Status Check(ServerContext* context, const ByteBuffer* request,
ByteBuffer* response);
HealthCheckServiceImpl(DefaultHealthCheckService* database,
std::unique_ptr<ServerCompletionQueue> cq);
~HealthCheckServiceImpl();
void StartServingThread();
private:
const DefaultHealthCheckService* const service_;
internal::RpcServiceMethod* method_;
// A tag that can be called with a bool argument. It's tailored for
// CallHandler's use. Before being used, it should be constructed with a
// method of CallHandler and a shared pointer to the handler. The
// shared pointer will be moved to the invoked function and the function
// can only be invoked once. That makes ref counting of the handler easier,
// because the shared pointer is not bound to the function and can be gone
// once the invoked function returns (if not used any more).
class CallableTag {
public:
using HandlerFunction =
std::function<void(std::shared_ptr<CallHandler>, bool)>;
CallableTag() {}
CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
: handler_function_(std::move(func)), handler_(std::move(handler)) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
}
// Runs the tag. This should be called only once. The handler is no
// longer owned by this tag after this method is invoked.
void Run(bool ok) {
GPR_ASSERT(handler_function_ != nullptr);
GPR_ASSERT(handler_ != nullptr);
handler_function_(std::move(handler_), ok);
}
// Releases and returns the shared pointer to the handler.
std::shared_ptr<CallHandler> ReleaseHandler() {
return std::move(handler_);
}
private:
HandlerFunction handler_function_ = nullptr;
std::shared_ptr<CallHandler> handler_;
};
// Call handler for Check method.
// Each handler takes care of one call. It contains per-call data and it
// will access the members of the parent class (i.e.,
// DefaultHealthCheckService) for per-service health data.
class CheckCallHandler : public CallHandler {
public:
// Instantiates a CheckCallHandler and requests the next health check
// call. The handler object will manage its own lifetime, so no action is
// needed from the caller any more regarding that object.
static void CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
// This ctor is public because we want to use std::make_shared<> in
// CreateAndStart(). This ctor shouldn't be used elsewhere.
CheckCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
// Not used for Check.
void SendHealth(std::shared_ptr<CallHandler> self,
ServingStatus status) override {}
private:
// Called when we receive a call.
// Spawns a new handler so that we can keep servicing future calls.
void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
// Called when Finish() is done.
void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
// The members passed down from HealthCheckServiceImpl.
ServerCompletionQueue* cq_;
DefaultHealthCheckService* database_;
HealthCheckServiceImpl* service_;
ByteBuffer request_;
GenericServerAsyncResponseWriter writer_;
ServerContext ctx_;
CallableTag next_;
};
// Call handler for Watch method.
// Each handler takes care of one call. It contains per-call data and it
// will access the members of the parent class (i.e.,
// DefaultHealthCheckService) for per-service health data.
class WatchCallHandler : public CallHandler {
public:
// Instantiates a WatchCallHandler and requests the next health check
// call. The handler object will manage its own lifetime, so no action is
// needed from the caller any more regarding that object.
static void CreateAndStart(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
// This ctor is public because we want to use std::make_shared<> in
// CreateAndStart(). This ctor shouldn't be used elsewhere.
WatchCallHandler(ServerCompletionQueue* cq,
DefaultHealthCheckService* database,
HealthCheckServiceImpl* service);
void SendHealth(std::shared_ptr<CallHandler> self,
ServingStatus status) override;
private:
// Called when we receive a call.
// Spawns a new handler so that we can keep servicing future calls.
void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
// Requires holding mu_.
void SendHealthLocked(std::shared_ptr<CallHandler> self,
ServingStatus status);
// When sending a health result finishes.
void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
// Called when Finish() is done.
void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
// Called when AsyncNotifyWhenDone() notifies us.
void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
void Shutdown(std::shared_ptr<CallHandler> self, const char* reason);
// The members passed down from HealthCheckServiceImpl.
ServerCompletionQueue* cq_;
DefaultHealthCheckService* database_;
HealthCheckServiceImpl* service_;
ByteBuffer request_;
grpc::string service_name_;
GenericServerAsyncWriter stream_;
ServerContext ctx_;
std::mutex mu_;
bool send_in_flight_ = false; // Guarded by mu_.
ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
// The state of the RPC progress.
enum CallState {
WAITING_FOR_CALL,
CALL_RECEIVED,
SEND_MESSAGE_PENDING,
FINISH_CALLED
} call_state_;
bool shutdown_ = false;
bool done_notified_ = false;
bool is_cancelled_ = false;
CallableTag next_;
CallableTag on_done_notified_;
CallableTag on_finish_done_;
};
// Handles the incoming requests and drives the completion queue in a loop.
static void Serve(void* arg);
// Returns true on success.
static bool DecodeRequest(const ByteBuffer& request,
grpc::string* service_name);
static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
// Needed to appease Windows compilers, which don't seem to allow
// nested classes to access protected members in the parent's
// superclass.
using Service::RequestAsyncServerStreaming;
using Service::RequestAsyncUnary;
DefaultHealthCheckService* database_;
std::unique_ptr<ServerCompletionQueue> cq_;
internal::RpcServiceMethod* check_method_;
internal::RpcServiceMethod* watch_method_;
// To synchronize the operations related to shutdown state of cq_, so that
// we don't enqueue new tags into cq_ after it is already shut down.
std::mutex cq_shutdown_mu_;
std::atomic_bool shutdown_{false};
std::unique_ptr<::grpc_core::Thread> thread_;
};
DefaultHealthCheckService();
void SetServingStatus(const grpc::string& service_name,
bool serving) override;
void SetServingStatus(bool serving) override;
enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
ServingStatus GetServingStatus(const grpc::string& service_name) const;
HealthCheckServiceImpl* GetHealthCheckService();
HealthCheckServiceImpl* GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue> cq);
private:
// Stores the current serving status of a service and any call
// handlers registered for updates when the service's status changes.
class ServiceData {
public:
void SetServingStatus(ServingStatus status);
ServingStatus GetServingStatus() const { return status_; }
void AddCallHandler(
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
void RemoveCallHandler(
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
bool Unused() const {
return call_handlers_.empty() && status_ == NOT_FOUND;
}
private:
ServingStatus status_ = NOT_FOUND;
std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
call_handlers_;
};
void RegisterCallHandler(
const grpc::string& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
void UnregisterCallHandler(
const grpc::string& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
mutable std::mutex mu_;
std::map<grpc::string, bool> services_map_;
std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_.
std::unique_ptr<HealthCheckServiceImpl> impl_;
};

@ -2,7 +2,6 @@
/* Generated by nanopb-0.3.7-dev */
#include "src/cpp/server/health/health.pb.h"
/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.

@ -17,11 +17,12 @@ extern "C" {
typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus {
grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0,
grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1,
grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2
grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2,
grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN = 3
} grpc_health_v1_HealthCheckResponse_ServingStatus;
#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN
#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING
#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING+1))
#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN+1))
/* Struct definitions */
typedef struct _grpc_health_v1_HealthCheckRequest {

@ -559,16 +559,20 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
// Only create default health check service when user did not provide an
// explicit one.
ServerCompletionQueue* health_check_cq = nullptr;
DefaultHealthCheckService::HealthCheckServiceImpl*
default_health_check_service_impl = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
DefaultHealthCheckServiceEnabled()) {
if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) {
gpr_log(GPR_INFO,
"Default health check service disabled at async-only server.");
} else {
auto* default_hc_service = new DefaultHealthCheckService;
health_check_service_.reset(default_hc_service);
RegisterService(nullptr, default_hc_service->GetHealthCheckService());
}
auto* default_hc_service = new DefaultHealthCheckService;
health_check_service_.reset(default_hc_service);
health_check_cq = new ServerCompletionQueue(GRPC_CQ_DEFAULT_POLLING);
grpc_server_register_completion_queue(server_, health_check_cq->cq(),
nullptr);
default_health_check_service_impl =
default_hc_service->GetHealthCheckService(
std::unique_ptr<ServerCompletionQueue>(health_check_cq));
RegisterService(nullptr, default_health_check_service_impl);
}
grpc_server_start(server_);
@ -583,6 +587,9 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
if (health_check_cq != nullptr) {
new UnimplementedAsyncRequest(this, health_check_cq);
}
}
// If this server has any support for synchronous methods (has any sync
@ -595,6 +602,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
if (default_health_check_service_impl != nullptr) {
default_health_check_service_impl->StartServingThread();
}
}
void Server::ShutdownInternal(gpr_timespec deadline) {

@ -34,10 +34,30 @@ message HealthCheckResponse {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}
service Health {
// If the requested service is unknown, the call will fail with status
// NOT_FOUND.
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
// Performs a watch for the serving status of the requested service.
// The server will immediately send back a message indicating the current
// serving status. It will then subsequently send a new message whenever
// the service's serving status changes.
//
// If the requested service is unknown when the call is received, the
// server will send a message setting the serving status to
// SERVICE_UNKNOWN but will *not* terminate the call. If at some
// future point, the serving status of the service becomes known, the
// server will send a new message with the service's serving status.
//
// If the call terminates with status UNIMPLEMENTED, then clients
// should assume this method is not supported and should not retry the
// call. If the call terminates with any other status (including OK),
// clients should retry the call with appropriate exponential backoff.
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

@ -64,6 +64,29 @@ class HealthCheckServiceImpl : public ::grpc::health::v1::Health::Service {
return Status::OK;
}
Status Watch(ServerContext* context, const HealthCheckRequest* request,
::grpc::ServerWriter<HealthCheckResponse>* writer) override {
auto last_state = HealthCheckResponse::UNKNOWN;
while (!context->IsCancelled()) {
{
std::lock_guard<std::mutex> lock(mu_);
HealthCheckResponse response;
auto iter = status_map_.find(request->service());
if (iter == status_map_.end()) {
response.set_status(response.SERVICE_UNKNOWN);
} else {
response.set_status(iter->second);
}
if (response.status() != last_state) {
writer->Write(response, ::grpc::WriteOptions());
}
}
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(1000, GPR_TIMESPAN)));
}
return Status::OK;
}
void SetStatus(const grpc::string& service_name,
HealthCheckResponse::ServingStatus status) {
std::lock_guard<std::mutex> lock(mu_);
@ -106,14 +129,6 @@ class CustomHealthCheckService : public HealthCheckServiceInterface {
HealthCheckServiceImpl* impl_; // not owned
};
void LoopCompletionQueue(ServerCompletionQueue* cq) {
void* tag;
bool ok;
while (cq->Next(&tag, &ok)) {
abort(); // Nothing should come out of the cq.
}
}
class HealthServiceEnd2endTest : public ::testing::Test {
protected:
HealthServiceEnd2endTest() {}
@ -218,6 +233,33 @@ class HealthServiceEnd2endTest : public ::testing::Test {
Status(StatusCode::NOT_FOUND, ""));
}
void VerifyHealthCheckServiceStreaming() {
const grpc::string kServiceName("service_name");
HealthCheckServiceInterface* service = server_->GetHealthCheckService();
// Start Watch for service.
ClientContext context;
HealthCheckRequest request;
request.set_service(kServiceName);
std::unique_ptr<::grpc::ClientReaderInterface<HealthCheckResponse>> reader =
hc_stub_->Watch(&context, request);
// Initial response will be SERVICE_UNKNOWN.
HealthCheckResponse response;
EXPECT_TRUE(reader->Read(&response));
EXPECT_EQ(response.SERVICE_UNKNOWN, response.status());
response.Clear();
// Now set service to NOT_SERVING and make sure we get an update.
service->SetServingStatus(kServiceName, false);
EXPECT_TRUE(reader->Read(&response));
EXPECT_EQ(response.NOT_SERVING, response.status());
response.Clear();
// Now set service to SERVING and make sure we get another update.
service->SetServingStatus(kServiceName, true);
EXPECT_TRUE(reader->Read(&response));
EXPECT_EQ(response.SERVING, response.status());
// Finish call.
context.TryCancel();
}
TestServiceImpl echo_test_service_;
HealthCheckServiceImpl health_check_service_impl_;
std::unique_ptr<Health::Stub> hc_stub_;
@ -245,6 +287,7 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) {
EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
SetUpServer(true, false, false, nullptr);
VerifyHealthCheckService();
VerifyHealthCheckServiceStreaming();
// The default service has a size limit of the service name.
const grpc::string kTooLongServiceName(201, 'x');
@ -252,22 +295,6 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) {
Status(StatusCode::INVALID_ARGUMENT, ""));
}
// The server has no sync service.
TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceAsyncOnly) {
EnableDefaultHealthCheckService(true);
EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
SetUpServer(false, true, false, nullptr);
cq_thread_ = std::thread(LoopCompletionQueue, cq_.get());
HealthCheckServiceInterface* default_service =
server_->GetHealthCheckService();
EXPECT_TRUE(default_service == nullptr);
ResetStubs();
SendHealthCheckRpc("", Status(StatusCode::UNIMPLEMENTED, ""));
}
// Provide an empty service to disable the default service.
TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) {
EnableDefaultHealthCheckService(true);
@ -296,6 +323,7 @@ TEST_F(HealthServiceEnd2endTest, ExplicitlyOverride) {
ResetStubs();
VerifyHealthCheckService();
VerifyHealthCheckServiceStreaming();
}
} // namespace

@ -16,6 +16,7 @@
set -ex
readonly NANOPB_ALTS_TMP_OUTPUT="$(mktemp -d)"
readonly NANOPB_HEALTH_TMP_OUTPUT="$(mktemp -d)"
readonly NANOPB_TMP_OUTPUT="$(mktemp -d)"
readonly PROTOBUF_INSTALL_PREFIX="$(mktemp -d)"
@ -67,6 +68,23 @@ if ! diff -r "$NANOPB_TMP_OUTPUT" src/core/ext/filters/client_channel/lb_policy/
exit 2
fi
#
# checks for health.proto
#
readonly HEALTH_GRPC_OUTPUT_PATH='src/cpp/server/health'
# nanopb-compile the proto to a temp location
./tools/codegen/core/gen_nano_proto.sh \
src/proto/grpc/health/v1/health.proto \
"$NANOPB_HEALTH_TMP_OUTPUT" \
"$HEALTH_GRPC_OUTPUT_PATH"
# compare outputs to checked compiled code
for NANOPB_OUTPUT_FILE in $NANOPB_HEALTH_TMP_OUTPUT/*.pb.*; do
if ! diff "$NANOPB_OUTPUT_FILE" "src/cpp/server/health/$(basename $NANOPB_OUTPUT_FILE)"; then
echo "Outputs differ: $NANOPB_HEALTH_TMP_OUTPUT vs $HEALTH_GRPC_OUTPUT_PATH"
exit 2
fi
done
#
# Checks for handshaker.proto and transport_security_common.proto
#

Loading…
Cancel
Save