diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index 785568f19a1..bd1b36e8839 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -158,8 +158,7 @@ class ServerInterface : public CallHook { public: RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, void* tag, - bool delete_on_finalize); + CompletionQueue* call_cq, void* tag); // uses BaseAsyncRequest::FinalizeResult @@ -175,7 +174,7 @@ class ServerInterface : public CallHook { ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) - : RegisteredAsyncRequest(server, context, stream, call_cq, tag, true) { + : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { IssueRequest(registered_method, nullptr, notification_cq); } @@ -191,7 +190,7 @@ class ServerInterface : public CallHook { CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* request) - : RegisteredAsyncRequest(server, context, stream, call_cq, tag, true), + : RegisteredAsyncRequest(server, context, stream, call_cq, tag), request_(request) { IssueRequest(registered_method, &payload_, notification_cq); } diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 3f205625ee7..e3e9174c9c2 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -119,10 +119,6 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { class UnimplementedAsyncRequest; class UnimplementedAsyncResponse; - class HealthCheckAsyncRequestContext; - class HealthCheckAsyncRequest; - class HealthCheckAsyncResponse; - /// Server constructors. To be used by \a ServerBuilder only. /// /// \param max_message_size Maximum message length that the channel can diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 9743bd5775e..46def70e8a3 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -49,14 +49,11 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; } // namespace DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( - DefaultHealthCheckService* service, bool sync) - : service_(service), method_(nullptr), sync_(sync) { - MethodHandler* handler = nullptr; - if (sync_) { - handler = - new RpcMethodHandler( - std::mem_fn(&HealthCheckServiceImpl::Check), this); - } + DefaultHealthCheckService* service) + : service_(service), method_(nullptr) { + MethodHandler* handler = + new RpcMethodHandler( + std::mem_fn(&HealthCheckServiceImpl::Check), this); method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC, handler); AddMethod(method_); @@ -160,9 +157,9 @@ DefaultHealthCheckService::GetServingStatus( } DefaultHealthCheckService::HealthCheckServiceImpl* -DefaultHealthCheckService::GetHealthCheckService(bool sync) { +DefaultHealthCheckService::GetHealthCheckService() { GPR_ASSERT(impl_ == nullptr); - impl_.reset(new HealthCheckServiceImpl(this, sync)); + impl_.reset(new HealthCheckServiceImpl(this)); return impl_.get(); } diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 1ecb0a2ba94..5c0e2303423 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -49,21 +49,14 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - HealthCheckServiceImpl(DefaultHealthCheckService* service, bool sync); + explicit HealthCheckServiceImpl(DefaultHealthCheckService* service); Status Check(ServerContext* context, const ByteBuffer* request, ByteBuffer* response); - bool sync() { return sync_; } - - // This is only useful for the async mode. It should be called after - // RegisterService returns. - void* server_tag() const { return method_->server_tag(); } - private: const DefaultHealthCheckService* const service_; RpcServiceMethod* method_; - const bool sync_; }; DefaultHealthCheckService(); @@ -72,7 +65,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { void SetServingStatus(bool serving) override; enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; ServingStatus GetServingStatus(const grpc::string& service_name) const; - HealthCheckServiceImpl* GetHealthCheckService(bool sync); + HealthCheckServiceImpl* GetHealthCheckService(); private: mutable std::mutex mu_; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 69e491dc96b..c377297ec00 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -119,79 +119,6 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -// This is a dummy implementation of the interface so that -// HealthCheckAsyncRequest can get Call from RegisteredAsyncRequest. It does not -// do any reading or writing. -class HealthCheckAsyncResponseWriter final - : public ServerAsyncStreamingInterface { - public: - HealthCheckAsyncResponseWriter() : call_(nullptr, nullptr, nullptr) {} - void SendInitialMetadata(void* tag) override { - abort(); // should not be called. - } - void BindCall(Call* call) override { call_ = *call; } - Call* call() { return &call_; } - - private: - Call call_; -}; - -class Server::HealthCheckAsyncRequestContext { - protected: - ServerContext server_context_; - HealthCheckAsyncResponseWriter rpc_; -}; - -class Server::HealthCheckAsyncRequest final - : public HealthCheckAsyncRequestContext, - public RegisteredAsyncRequest { - public: - HealthCheckAsyncRequest( - DefaultHealthCheckService::HealthCheckServiceImpl* service, - Server* server, ServerCompletionQueue* cq) - : RegisteredAsyncRequest(server, &server_context_, &rpc_, cq, this, - false), - service_(service), - server_(server), - cq_(cq) { - IssueRequest(service->server_tag(), &payload_, cq); - } - - bool FinalizeResult(void** tag, bool* status) override; - Call* call() { return rpc_.call(); } - ByteBuffer* response() { return &response_; } - Status* status() { return &status_; } - ServerContext* server_context() { return &server_context_; } - - private: - DefaultHealthCheckService::HealthCheckServiceImpl* service_; - Server* const server_; - ServerCompletionQueue* const cq_; - grpc_byte_buffer* payload_; - ByteBuffer request_; - ByteBuffer response_; - Status status_; -}; - -typedef SneakyCallOpSet - HealthCheckAsyncResponseOp; -class Server::HealthCheckAsyncResponse final - : public HealthCheckAsyncResponseOp { - public: - HealthCheckAsyncResponse(HealthCheckAsyncRequest* request); - ~HealthCheckAsyncResponse() { delete request_; } - - bool FinalizeResult(void** tag, bool* status) override { - HealthCheckAsyncResponseOp::FinalizeResult(tag, status); - delete this; - return false; - } - - private: - HealthCheckAsyncRequest* const request_; -}; - class ShutdownTag : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } @@ -572,14 +499,16 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // Only create default health check service when user did not provide an // explicit one. - DefaultHealthCheckService::HealthCheckServiceImpl* health_service = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && DefaultHealthCheckServiceEnabled()) { - auto* default_hc_service = new DefaultHealthCheckService; - health_check_service_.reset(default_hc_service); - health_service = - default_hc_service->GetHealthCheckService(!sync_server_cqs_->empty()); - RegisterService(nullptr, health_service); + if (sync_server_cqs_->empty()) { + gpr_log(GPR_ERROR, + "Default health check service disabled at async-only server."); + } else { + auto* default_hc_service = new DefaultHealthCheckService; + health_check_service_.reset(default_hc_service); + RegisterService(nullptr, default_hc_service->GetHealthCheckService()); + } } grpc_server_start(server_); @@ -596,14 +525,6 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { } } - if (health_service && !health_service->sync()) { - for (size_t i = 0; i < num_cqs; i++) { - if (cqs[i]->IsFrequentlyPolled()) { - new HealthCheckAsyncRequest(health_service, this, cqs[i]); - } - } - } - for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } @@ -715,10 +636,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, - bool delete_on_finalize) - : BaseAsyncRequest(server, context, stream, call_cq, tag, - delete_on_finalize) {} + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -776,45 +695,6 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( request_->stream()->call_.PerformOps(this); } -bool Server::HealthCheckAsyncRequest::FinalizeResult(void** tag, bool* status) { - bool serialization_status = - *status && payload_ && - SerializationTraits::Deserialize(payload_, &request_).ok(); - RegisteredAsyncRequest::FinalizeResult(tag, status); - *status = serialization_status && *status; - if (*status) { - new HealthCheckAsyncRequest(service_, server_, cq_); - status_ = service_->Check(&server_context_, &request_, &response_); - new HealthCheckAsyncResponse(this); - return false; - } else { - delete this; - return false; - } -} - -Server::HealthCheckAsyncResponse::HealthCheckAsyncResponse( - HealthCheckAsyncRequest* request) - : request_(request) { - ServerContext* context = request_->server_context(); - if (!context->sent_initial_metadata_) { - SendInitialMetadata(context->initial_metadata_, - context->initial_metadata_flags()); - if (context->compression_level_set()) { - set_compression_level(context->compression_level()); - } - context->sent_initial_metadata_ = true; - } - Status* status = request_->status(); - if (status->ok()) { - ServerSendStatus(context->trailing_metadata_, - SendMessage(*request_->response())); - } else { - ServerSendStatus(context->trailing_metadata_, *status); - } - request_->call()->PerformOps(this); -} - ServerInitializer* Server::initializer() { return server_initializer_.get(); } } // namespace grpc diff --git a/test/cpp/end2end/health_service_end2end_test.cc b/test/cpp/end2end/health_service_end2end_test.cc index 8a6a9886c9a..3d510078570 100644 --- a/test/cpp/end2end/health_service_end2end_test.cc +++ b/test/cpp/end2end/health_service_end2end_test.cc @@ -273,12 +273,13 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceAsyncOnly) { SetUpServer(false, true, false, nullptr); cq_thread_ = std::thread(LoopCompletionQueue, cq_.get()); - VerifyHealthCheckService(); + HealthCheckServiceInterface* default_service = + server_->GetHealthCheckService(); + EXPECT_TRUE(default_service == nullptr); - // The default service has a size limit of the service name. - const grpc::string kTooLongServiceName(201, 'x'); - SendHealthCheckRpc(kTooLongServiceName, - Status(StatusCode::INVALID_ARGUMENT, "")); + ResetStubs(); + + SendHealthCheckRpc("", Status(StatusCode::UNIMPLEMENTED, "")); } // Provide an empty service to disable the default service.