pull/9205/head
yang-g 8 years ago
parent a3c95529c7
commit ef16328063
  1. 65
      src/cpp/server/health/default_health_check_service.cc
  2. 41
      src/cpp/server/health/default_health_check_service.h
  3. 22
      src/cpp/server/server_cc.cc

@ -45,12 +45,25 @@
namespace grpc {
namespace {
const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* service, bool sync)
: service_(service), method_(nullptr), sync_(sync) {
MethodHandler* handler = nullptr;
if (sync_) {
handler =
new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>(
std::mem_fn(&HealthCheckServiceImpl::Check), this);
}
method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC,
handler);
AddMethod(method_);
}
Status CheckHealth(const DefaultHealthCheckService* service,
ServerContext* context, const ByteBuffer* request,
ByteBuffer* response) {
Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
// Decode request.
std::vector<Slice> slices;
request->Dump(&slices);
@ -87,7 +100,7 @@ Status CheckHealth(const DefaultHealthCheckService* service,
// Check status from the associated default health checking service.
DefaultHealthCheckService::ServingStatus serving_status =
service->GetServingStatus(
service_->GetServingStatus(
request_struct.has_service ? request_struct.service : "");
if (serving_status == DefaultHealthCheckService::NOT_FOUND) {
return Status(StatusCode::NOT_FOUND, "");
@ -117,41 +130,8 @@ Status CheckHealth(const DefaultHealthCheckService* service,
response->Swap(&response_buffer);
return Status::OK;
}
} // namespace
DefaultHealthCheckService::SyncHealthCheckServiceImpl::
SyncHealthCheckServiceImpl(DefaultHealthCheckService* service)
: service_(service) {
auto* handler =
new RpcMethodHandler<SyncHealthCheckServiceImpl, ByteBuffer, ByteBuffer>(
std::mem_fn(&SyncHealthCheckServiceImpl::Check), this);
auto* method = new RpcServiceMethod(kHealthCheckMethodName,
RpcMethod::NORMAL_RPC, handler);
AddMethod(method);
}
Status DefaultHealthCheckService::SyncHealthCheckServiceImpl::Check(
ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
return CheckHealth(service_, context, request, response);
}
DefaultHealthCheckService::AsyncHealthCheckServiceImpl::
AsyncHealthCheckServiceImpl(DefaultHealthCheckService* service)
: service_(service) {
auto* method = new RpcServiceMethod(kHealthCheckMethodName,
RpcMethod::NORMAL_RPC, nullptr);
AddMethod(method);
method_ = method;
}
Status DefaultHealthCheckService::AsyncHealthCheckServiceImpl::Check(
ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
return CheckHealth(service_, context, request, response);
}
DefaultHealthCheckService::DefaultHealthCheckService()
: sync_service_(new SyncHealthCheckServiceImpl(this)),
async_service_(new AsyncHealthCheckServiceImpl(this)) {
DefaultHealthCheckService::DefaultHealthCheckService() {
services_map_.emplace("", true);
}
@ -179,4 +159,11 @@ DefaultHealthCheckService::GetServingStatus(
return iter->second ? SERVING : NOT_SERVING;
}
DefaultHealthCheckService::HealthCheckServiceImpl*
DefaultHealthCheckService::GetHealthCheckService(bool sync) {
GPR_ASSERT(impl_ == nullptr);
impl_.reset(new HealthCheckServiceImpl(this, sync));
return impl_.get();
}
} // namespace grpc

@ -44,47 +44,40 @@ namespace grpc {
// Default implementation of HealthCheckServiceInterface. Server will create and
// own it.
class DefaultHealthCheckService : public HealthCheckServiceInterface {
class DefaultHealthCheckService final : public HealthCheckServiceInterface {
public:
class SyncHealthCheckServiceImpl : public Service {
// The service impl to register with the server.
class HealthCheckServiceImpl : public Service {
public:
explicit SyncHealthCheckServiceImpl(DefaultHealthCheckService* service);
HealthCheckServiceImpl(DefaultHealthCheckService* service, bool sync);
Status Check(ServerContext* context, const ByteBuffer* request,
ByteBuffer* response);
private:
const DefaultHealthCheckService* service_;
};
bool sync() { return sync_; }
class AsyncHealthCheckServiceImpl : public Service {
public:
explicit AsyncHealthCheckServiceImpl(DefaultHealthCheckService* service);
Status Check(ServerContext* context, const ByteBuffer* request,
ByteBuffer* response);
const RpcServiceMethod* method() const { return method_; }
// This is only useful for the async mode. It should be called after
// RegisterService returns.
void* server_tag() const { return method_->server_tag(); }
private:
const DefaultHealthCheckService* service_;
const RpcServiceMethod* method_;
const DefaultHealthCheckService* const service_;
RpcServiceMethod* method_;
const bool sync_;
};
DefaultHealthCheckService();
void SetServingStatus(const grpc::string& service_name, bool serving) final;
void SetServingStatus(bool serving) final;
void SetServingStatus(const grpc::string& service_name,
bool serving) override;
void SetServingStatus(bool serving) override;
enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
ServingStatus GetServingStatus(const grpc::string& service_name) const;
SyncHealthCheckServiceImpl* GetSyncHealthCheckService() const {
return sync_service_.get();
}
AsyncHealthCheckServiceImpl* GetAsyncHealthCheckService() const {
return async_service_.get();
}
HealthCheckServiceImpl* GetHealthCheckService(bool sync);
private:
mutable std::mutex mu_;
std::map<grpc::string, bool> services_map_;
std::unique_ptr<SyncHealthCheckServiceImpl> sync_service_;
std::unique_ptr<AsyncHealthCheckServiceImpl> async_service_;
std::unique_ptr<HealthCheckServiceImpl> impl_;
};
} // namespace grpc

@ -144,14 +144,14 @@ class Server::HealthCheckAsyncRequest final
public RegisteredAsyncRequest {
public:
HealthCheckAsyncRequest(
DefaultHealthCheckService::AsyncHealthCheckServiceImpl* service,
DefaultHealthCheckService::HealthCheckServiceImpl* service,
Server* server, ServerCompletionQueue* cq)
: RegisteredAsyncRequest(server, &server_context_, &rpc_, cq, this,
false),
service_(service),
server_(server),
cq_(cq) {
IssueRequest(service->method()->server_tag(), &payload_, cq);
IssueRequest(service->server_tag(), &payload_, cq);
}
bool FinalizeResult(void** tag, bool* status) override;
@ -161,7 +161,7 @@ class Server::HealthCheckAsyncRequest final
ServerContext* server_context() { return &server_context_; }
private:
DefaultHealthCheckService::AsyncHealthCheckServiceImpl* service_;
DefaultHealthCheckService::HealthCheckServiceImpl* service_;
Server* const server_;
ServerCompletionQueue* const cq_;
grpc_byte_buffer* payload_;
@ -610,18 +610,14 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
// Only create default health check service when user did not provide an
// explicit one.
DefaultHealthCheckService::AsyncHealthCheckServiceImpl* async_health_service =
nullptr;
DefaultHealthCheckService::HealthCheckServiceImpl* health_service = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
DefaultHealthCheckServiceEnabled()) {
auto* default_hc_service = new DefaultHealthCheckService;
health_check_service_.reset(default_hc_service);
if (!sync_server_cqs_->empty()) { // Has sync methods.
RegisterService(nullptr, default_hc_service->GetSyncHealthCheckService());
} else {
async_health_service = default_hc_service->GetAsyncHealthCheckService();
RegisterService(nullptr, async_health_service);
}
health_service =
default_hc_service->GetHealthCheckService(!sync_server_cqs_->empty());
RegisterService(nullptr, health_service);
}
grpc_server_start(server_);
@ -638,10 +634,10 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
}
}
if (async_health_service) {
if (health_service && !health_service->sync()) {
for (size_t i = 0; i < num_cqs; i++) {
if (cqs[i]->IsFrequentlyPolled()) {
new HealthCheckAsyncRequest(async_health_service, this, cqs[i]);
new HealthCheckAsyncRequest(health_service, this, cqs[i]);
}
}
}

Loading…
Cancel
Save