more cleanup

pull/9205/head
yang-g 8 years ago
parent a36ea4be8b
commit b90631d3f0
  1. 13
      include/grpc++/health_check_service_interface.h
  2. 2
      src/cpp/server/health/health_check_service_server_builder_option.cc
  3. 89
      src/cpp/server/server_cc.cc

@ -38,26 +38,31 @@
namespace grpc {
const char kDefaultHealthCheckServiceInterfaceArg[] =
"grpc.default_health_check_service_interface";
const char kHealthCheckServiceInterfaceArg[] =
"grpc.health_check_service_interface";
// The gRPC server uses this interface to expose the health checking service
// without depending on protobuf.
class HealthCheckServiceInterface {
public:
virtual ~HealthCheckServiceInterface() {}
// Set or change the serving status of the given service_name.
virtual void SetServingStatus(const grpc::string& service_name,
bool serving) = 0;
// Apply to all registered service names.
virtual void SetServingStatus(bool serving) = 0;
};
bool DefaultHealthCheckServiceEnabled();
// Enable/disable the default health checking service. This applies to all C++
// servers created afterwards. For each server, user can override the default
// with a HealthCheckServiceServerBuilderOption.
// NOT thread safe.
void EnableDefaultHealthCheckService(bool enable);
// NOT thread safe.
bool DefaultHealthCheckServiceEnabled();
} // namespace grpc
#endif // GRPCXX_HEALTH_CHECK_SERVICE_INTERFACE_H

@ -41,7 +41,7 @@ HealthCheckServiceServerBuilderOption::HealthCheckServiceServerBuilderOption(
// Hand over hc_ to the server.
void HealthCheckServiceServerBuilderOption::UpdateArguments(
ChannelArguments* args) {
args->SetPointer(kDefaultHealthCheckServiceInterfaceArg, hc_.release());
args->SetPointer(kHealthCheckServiceInterfaceArg, hc_.release());
}
void HealthCheckServiceServerBuilderOption::UpdatePlugins(

@ -119,6 +119,9 @@ class Server::UnimplementedAsyncResponse final
UnimplementedAsyncRequest* const request_;
};
// This is a dummy implementation of the interface so that
// HealthCheckAsyncRequest can get Call from RegisteredAsyncRequest. It does not
// do any reading or writing.
class HealthCheckAsyncResponseWriter final
: public ServerAsyncStreamingInterface {
public:
@ -189,47 +192,6 @@ class Server::HealthCheckAsyncResponse final
HealthCheckAsyncRequest* const request_;
};
bool Server::HealthCheckAsyncRequest::FinalizeResult(void** tag, bool* status) {
bool serialization_status =
*status && payload_ &&
SerializationTraits<ByteBuffer>::Deserialize(
payload_, &request_, server_->max_receive_message_size())
.ok();
RegisteredAsyncRequest::FinalizeResult(tag, status);
*status = serialization_status && *status;
if (*status) {
new HealthCheckAsyncRequest(service_, server_, cq_);
status_ = service_->Check(&server_context_, &request_, &response_);
new HealthCheckAsyncResponse(this);
return false;
} else {
delete this;
return false;
}
}
Server::HealthCheckAsyncResponse::HealthCheckAsyncResponse(
HealthCheckAsyncRequest* request)
: request_(request) {
ServerContext* context = request_->server_context();
if (!context->sent_initial_metadata_) {
SendInitialMetadata(context->initial_metadata_,
context->initial_metadata_flags());
if (context->compression_level_set()) {
set_compression_level(context->compression_level());
}
context->sent_initial_metadata_ = true;
}
Status* status = request_->status();
if (status->ok()) {
ServerSendStatus(context->trailing_metadata_,
SendMessage(*request_->response()));
} else {
ServerSendStatus(context->trailing_metadata_, *status);
}
request_->call()->PerformOps(this);
}
class ShutdownTag : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) { return false; }
@ -490,8 +452,8 @@ Server::Server(
args->SetChannelArgs(&channel_args);
for (size_t i = 0; i < channel_args.num_args; i++) {
if (0 == strcmp(channel_args.args[i].key,
kDefaultHealthCheckServiceInterfaceArg)) {
if (0 ==
strcmp(channel_args.args[i].key, kHealthCheckServiceInterfaceArg)) {
if (channel_args.args[i].value.pointer.p == nullptr) {
health_check_service_disabled_ = true;
} else {
@ -822,6 +784,47 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
request_->stream()->call_.PerformOps(this);
}
bool Server::HealthCheckAsyncRequest::FinalizeResult(void** tag, bool* status) {
bool serialization_status =
*status && payload_ &&
SerializationTraits<ByteBuffer>::Deserialize(
payload_, &request_, server_->max_receive_message_size())
.ok();
RegisteredAsyncRequest::FinalizeResult(tag, status);
*status = serialization_status && *status;
if (*status) {
new HealthCheckAsyncRequest(service_, server_, cq_);
status_ = service_->Check(&server_context_, &request_, &response_);
new HealthCheckAsyncResponse(this);
return false;
} else {
delete this;
return false;
}
}
Server::HealthCheckAsyncResponse::HealthCheckAsyncResponse(
HealthCheckAsyncRequest* request)
: request_(request) {
ServerContext* context = request_->server_context();
if (!context->sent_initial_metadata_) {
SendInitialMetadata(context->initial_metadata_,
context->initial_metadata_flags());
if (context->compression_level_set()) {
set_compression_level(context->compression_level());
}
context->sent_initial_metadata_ = true;
}
Status* status = request_->status();
if (status->ok()) {
ServerSendStatus(context->trailing_metadata_,
SendMessage(*request_->response()));
} else {
ServerSendStatus(context->trailing_metadata_, *status);
}
request_->call()->PerformOps(this);
}
ServerInitializer* Server::initializer() { return server_initializer_.get(); }
} // namespace grpc

Loading…
Cancel
Save