Add a Shutdown call to HealthCheckServiceInterface

reviewable/pr17435/r1
yang-g 6 years ago
parent e97c9457e2
commit f1f557bc43
  1. 4
      include/grpcpp/health_check_service_interface.h
  2. 18
      src/cpp/server/health/default_health_check_service.cc
  3. 3
      src/cpp/server/health/default_health_check_service.h
  4. 107
      test/cpp/end2end/health_service_end2end_test.cc

@ -37,6 +37,10 @@ class HealthCheckServiceInterface {
bool serving) = 0; bool serving) = 0;
/// Apply to all registered service names. /// Apply to all registered service names.
virtual void SetServingStatus(bool serving) = 0; virtual void SetServingStatus(bool serving) = 0;
/// Set all registered service names to not serving and prevent future
/// state changes.
virtual void Shutdown() {}
}; };
/// Enable/disable the default health checking service. This applies to all C++ /// Enable/disable the default health checking service. This applies to all C++

@ -42,18 +42,36 @@ DefaultHealthCheckService::DefaultHealthCheckService() {
void DefaultHealthCheckService::SetServingStatus( void DefaultHealthCheckService::SetServingStatus(
const grpc::string& service_name, bool serving) { const grpc::string& service_name, bool serving) {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
if (shutdown_) {
return;
}
services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING); services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
} }
void DefaultHealthCheckService::SetServingStatus(bool serving) { void DefaultHealthCheckService::SetServingStatus(bool serving) {
const ServingStatus status = serving ? SERVING : NOT_SERVING; const ServingStatus status = serving ? SERVING : NOT_SERVING;
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
if (shutdown_) {
return;
}
for (auto& p : services_map_) { for (auto& p : services_map_) {
ServiceData& service_data = p.second; ServiceData& service_data = p.second;
service_data.SetServingStatus(status); service_data.SetServingStatus(status);
} }
} }
void DefaultHealthCheckService::Shutdown() {
std::unique_lock<std::mutex> lock(mu_);
if (shutdown_) {
return;
}
shutdown_ = true;
for (auto& p : services_map_) {
ServiceData& service_data = p.second;
service_data.SetServingStatus(NOT_SERVING);
}
}
DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus( DefaultHealthCheckService::GetServingStatus(
const grpc::string& service_name) const { const grpc::string& service_name) const {

@ -237,6 +237,8 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
bool serving) override; bool serving) override;
void SetServingStatus(bool serving) override; void SetServingStatus(bool serving) override;
void Shutdown() override;
ServingStatus GetServingStatus(const grpc::string& service_name) const; ServingStatus GetServingStatus(const grpc::string& service_name) const;
HealthCheckServiceImpl* GetHealthCheckService( HealthCheckServiceImpl* GetHealthCheckService(
@ -272,6 +274,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
mutable std::mutex mu_; mutable std::mutex mu_;
bool shutdown_ = false; // Guarded by mu_.
std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_. std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_.
std::unique_ptr<HealthCheckServiceImpl> impl_; std::unique_ptr<HealthCheckServiceImpl> impl_;
}; };

@ -90,18 +90,36 @@ class HealthCheckServiceImpl : public ::grpc::health::v1::Health::Service {
void SetStatus(const grpc::string& service_name, void SetStatus(const grpc::string& service_name,
HealthCheckResponse::ServingStatus status) { HealthCheckResponse::ServingStatus status) {
std::lock_guard<std::mutex> lock(mu_); std::lock_guard<std::mutex> lock(mu_);
if (shutdown_) {
return;
}
status_map_[service_name] = status; status_map_[service_name] = status;
} }
void SetAll(HealthCheckResponse::ServingStatus status) { void SetAll(HealthCheckResponse::ServingStatus status) {
std::lock_guard<std::mutex> lock(mu_); std::lock_guard<std::mutex> lock(mu_);
if (shutdown_) {
return;
}
for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) { for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) {
iter->second = status; iter->second = status;
} }
} }
void Shutdown() {
std::lock_guard<std::mutex> lock(mu_);
if (shutdown_) {
return;
}
shutdown_ = true;
for (auto iter = status_map_.begin(); iter != status_map_.end(); ++iter) {
iter->second = HealthCheckResponse::NOT_SERVING;
}
}
private: private:
std::mutex mu_; std::mutex mu_;
bool shutdown_ = false;
std::map<const grpc::string, HealthCheckResponse::ServingStatus> status_map_; std::map<const grpc::string, HealthCheckResponse::ServingStatus> status_map_;
}; };
@ -125,6 +143,8 @@ class CustomHealthCheckService : public HealthCheckServiceInterface {
: HealthCheckResponse::NOT_SERVING); : HealthCheckResponse::NOT_SERVING);
} }
void Shutdown() override { impl_->Shutdown(); }
private: private:
HealthCheckServiceImpl* impl_; // not owned HealthCheckServiceImpl* impl_; // not owned
}; };
@ -260,6 +280,71 @@ class HealthServiceEnd2endTest : public ::testing::Test {
context.TryCancel(); context.TryCancel();
} }
// Verify that after HealthCheckServiceInterface::Shutdown is called
// 1. unary client will see NOT_SERVING.
// 2. unary client still sees NOT_SERVING after a SetServing(true) is called.
// 3. streaming (Watch) client will see an update.
// This has to be called last.
void VerifyHealthCheckServiceShutdown() {
const grpc::string kServiceName("service_name");
HealthCheckServiceInterface* service = server_->GetHealthCheckService();
EXPECT_TRUE(service != nullptr);
const grpc::string kHealthyService("healthy_service");
const grpc::string kUnhealthyService("unhealthy_service");
const grpc::string kNotRegisteredService("not_registered");
service->SetServingStatus(kHealthyService, true);
service->SetServingStatus(kUnhealthyService, false);
ResetStubs();
// Start Watch for service.
ClientContext context;
HealthCheckRequest request;
request.set_service(kServiceName);
std::unique_ptr<::grpc::ClientReaderInterface<HealthCheckResponse>> reader =
hc_stub_->Watch(&context, request);
// Initial response will be SERVICE_UNKNOWN.
HealthCheckResponse response;
EXPECT_TRUE(reader->Read(&response));
EXPECT_EQ(response.SERVICE_UNKNOWN, response.status());
// Set service to SERVING and make sure we get an update.
service->SetServingStatus(kServiceName, true);
EXPECT_TRUE(reader->Read(&response));
EXPECT_EQ(response.SERVING, response.status());
SendHealthCheckRpc("", Status::OK, HealthCheckResponse::SERVING);
SendHealthCheckRpc(kHealthyService, Status::OK,
HealthCheckResponse::SERVING);
SendHealthCheckRpc(kUnhealthyService, Status::OK,
HealthCheckResponse::NOT_SERVING);
SendHealthCheckRpc(kNotRegisteredService,
Status(StatusCode::NOT_FOUND, ""));
// Shutdown health check service.
service->Shutdown();
// Watch client gets another update.
EXPECT_TRUE(reader->Read(&response));
EXPECT_EQ(response.NOT_SERVING, response.status());
// Finish Watch call.
context.TryCancel();
SendHealthCheckRpc("", Status::OK, HealthCheckResponse::NOT_SERVING);
SendHealthCheckRpc(kHealthyService, Status::OK,
HealthCheckResponse::NOT_SERVING);
SendHealthCheckRpc(kUnhealthyService, Status::OK,
HealthCheckResponse::NOT_SERVING);
SendHealthCheckRpc(kNotRegisteredService,
Status(StatusCode::NOT_FOUND, ""));
// Setting status after Shutdown has no effect.
service->SetServingStatus(kHealthyService, true);
SendHealthCheckRpc(kHealthyService, Status::OK,
HealthCheckResponse::NOT_SERVING);
}
TestServiceImpl echo_test_service_; TestServiceImpl echo_test_service_;
HealthCheckServiceImpl health_check_service_impl_; HealthCheckServiceImpl health_check_service_impl_;
std::unique_ptr<Health::Stub> hc_stub_; std::unique_ptr<Health::Stub> hc_stub_;
@ -295,6 +380,13 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) {
Status(StatusCode::INVALID_ARGUMENT, "")); Status(StatusCode::INVALID_ARGUMENT, ""));
} }
TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceShutdown) {
EnableDefaultHealthCheckService(true);
EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
SetUpServer(true, false, false, nullptr);
VerifyHealthCheckServiceShutdown();
}
// Provide an empty service to disable the default service. // Provide an empty service to disable the default service.
TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) { TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) {
EnableDefaultHealthCheckService(true); EnableDefaultHealthCheckService(true);
@ -326,6 +418,21 @@ TEST_F(HealthServiceEnd2endTest, ExplicitlyOverride) {
VerifyHealthCheckServiceStreaming(); VerifyHealthCheckServiceStreaming();
} }
TEST_F(HealthServiceEnd2endTest, ExplicitlyHealthServiceShutdown) {
EnableDefaultHealthCheckService(true);
EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
std::unique_ptr<HealthCheckServiceInterface> override_service(
new CustomHealthCheckService(&health_check_service_impl_));
HealthCheckServiceInterface* underlying_service = override_service.get();
SetUpServer(false, false, true, std::move(override_service));
HealthCheckServiceInterface* service = server_->GetHealthCheckService();
EXPECT_TRUE(service == underlying_service);
ResetStubs();
VerifyHealthCheckServiceShutdown();
}
} // namespace } // namespace
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc

Loading…
Cancel
Save