From 8e708b12cb1c88f5cb6e3984d887c3c2b9bee54f Mon Sep 17 00:00:00 2001 From: yang-g Date: Thu, 29 Dec 2016 11:44:36 -0800 Subject: [PATCH] WIP --- include/grpc++/server.h | 1 + src/cpp/server/server_cc.cc | 103 ++++++++++---- .../end2end/health_service_end2end_test.cc | 126 +++++++++--------- 3 files changed, 141 insertions(+), 89 deletions(-) diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 9f31d00ef0b..3f205625ee7 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -121,6 +121,7 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { class HealthCheckAsyncRequestContext; class HealthCheckAsyncRequest; + class HealthCheckAsyncResponse; /// Server constructors. To be used by \a ServerBuilder only. /// diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 20641aeea8d..43f09470959 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -119,11 +119,24 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; +class HealthCheckAsyncResponseWriter final + : public ServerAsyncStreamingInterface { + public: + HealthCheckAsyncResponseWriter() : call_(nullptr, nullptr, nullptr) {} + void SendInitialMetadata(void* tag) override { + abort(); // should not be called. + } + void BindCall(Call* call) override { call_ = *call; } + Call* call() { return &call_; } + + private: + Call call_; +}; + class Server::HealthCheckAsyncRequestContext { protected: - HealthCheckAsyncRequestContext() : rpc_(&server_context_) {} ServerContext server_context_; - ServerAsyncResponseWriter rpc_; + HealthCheckAsyncResponseWriter rpc_; }; class Server::HealthCheckAsyncRequest final @@ -137,49 +150,86 @@ class Server::HealthCheckAsyncRequest final false), service_(service), server_(server), - cq_(cq), - had_request_(false) { + cq_(cq) { IssueRequest(service->method()->server_tag(), &payload_, cq); } bool FinalizeResult(void** tag, bool* status) override; + Call* call() { return rpc_.call(); } + ByteBuffer* response() { return &response_; } + Status* status() { return &status_; } + ServerContext* server_context() { return &server_context_; } private: DefaultHealthCheckService::AsyncHealthCheckServiceImpl* service_; Server* const server_; ServerCompletionQueue* const cq_; grpc_byte_buffer* payload_; - bool had_request_; ByteBuffer request_; ByteBuffer response_; + Status status_; +}; + +typedef SneakyCallOpSet + HealthCheckAsyncResponseOp; +class Server::HealthCheckAsyncResponse final + : public HealthCheckAsyncResponseOp { + public: + HealthCheckAsyncResponse(HealthCheckAsyncRequest* request); + ~HealthCheckAsyncResponse() { delete request_; } + + bool FinalizeResult(void** tag, bool* status) override { + HealthCheckAsyncResponseOp::FinalizeResult(tag, status); + delete this; + return false; + } + + private: + HealthCheckAsyncRequest* const request_; }; bool Server::HealthCheckAsyncRequest::FinalizeResult(void** tag, bool* status) { - if (!had_request_) { - had_request_ = true; - bool serialization_status = - *status && payload_ && - SerializationTraits::Deserialize( - payload_, &request_, server_->max_receive_message_size()) - .ok(); - RegisteredAsyncRequest::FinalizeResult(tag, status); - *status = serialization_status && *status; - if (*status) { - new HealthCheckAsyncRequest(service_, server_, cq_); - Status s = service_->Check(&server_context_, &request_, &response_); - rpc_.Finish(response_, s, this); - return false; - } else { - // TODO what to do here - delete this; - return false; - } + bool serialization_status = + *status && payload_ && + SerializationTraits::Deserialize( + payload_, &request_, server_->max_receive_message_size()) + .ok(); + RegisteredAsyncRequest::FinalizeResult(tag, status); + *status = serialization_status && *status; + if (*status) { + new HealthCheckAsyncRequest(service_, server_, cq_); + status_ = service_->Check(&server_context_, &request_, &response_); + new HealthCheckAsyncResponse(this); + return false; } else { delete this; return false; } } +Server::HealthCheckAsyncResponse::HealthCheckAsyncResponse( + HealthCheckAsyncRequest* request) + : request_(request) { + ServerContext* context = request_->server_context(); + if (!context->sent_initial_metadata_) { + SendInitialMetadata(context->initial_metadata_, + context->initial_metadata_flags()); + if (context->compression_level_set()) { + set_compression_level(context->compression_level()); + } + context->sent_initial_metadata_ = true; + } + Status* status = request_->status(); + if (status->ok()) { + ServerSendStatus(context->trailing_metadata_, + SendMessage(*request_->response())); + } else { + ServerSendStatus(context->trailing_metadata_, *status); + } + request_->call()->PerformOps(this); +} + class ShutdownTag : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } @@ -567,9 +617,10 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { auto* default_hc_service = new DefaultHealthCheckService; health_check_service_.reset(default_hc_service); if (!sync_server_cqs_->empty()) { // Has sync methods. + gpr_log(GPR_ERROR, "register sync"); // XXX RegisterService(nullptr, default_hc_service->GetSyncHealthCheckService()); - } - if (sync_server_cqs_->empty()) { // No sync methods. + } else { + gpr_log(GPR_ERROR, "register async"); // XXX async_health_service = default_hc_service->GetAsyncHealthCheckService(); RegisterService(nullptr, async_health_service); } diff --git a/test/cpp/end2end/health_service_end2end_test.cc b/test/cpp/end2end/health_service_end2end_test.cc index c41a75ec37c..9f2df90207a 100644 --- a/test/cpp/end2end/health_service_end2end_test.cc +++ b/test/cpp/end2end/health_service_end2end_test.cc @@ -242,69 +242,69 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceDisabled) { SendHealthCheckRpc("", Status(StatusCode::UNIMPLEMENTED, "")); } -TEST_F(HealthServiceEnd2endTest, DefaultHealthService) { - EnableDefaultHealthCheckService(true); - EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); - SetUpServer(true, false, nullptr); - VerifyHealthCheckService(); - - // The default service has a size limit of the service name. - const grpc::string kTooLongServiceName(201, 'x'); - SendHealthCheckRpc(kTooLongServiceName, - Status(StatusCode::INVALID_ARGUMENT, "")); -} - -void LoopCompletionQueue(ServerCompletionQueue* cq) { - void* tag; - bool ok; - while (cq->Next(&tag, &ok)) { - gpr_log(GPR_ERROR, "next %p %d", tag, ok); - } - gpr_log(GPR_ERROR, "returning from thread"); -} - -TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceAsync) { - EnableDefaultHealthCheckService(true); - EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); - SetUpServer(false, false, nullptr); - cq_thread_ = std::thread(LoopCompletionQueue, cq_.get()); - VerifyHealthCheckService(); - - // The default service has a size limit of the service name. - const grpc::string kTooLongServiceName(201, 'x'); - SendHealthCheckRpc(kTooLongServiceName, - Status(StatusCode::INVALID_ARGUMENT, "")); -} - -// Provide an empty service to disable the default service. -TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) { - EnableDefaultHealthCheckService(true); - EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); - std::unique_ptr empty_service; - SetUpServer(true, true, std::move(empty_service)); - HealthCheckServiceInterface* service = server_->GetHealthCheckService(); - EXPECT_TRUE(service == nullptr); - - ResetStubs(); - - SendHealthCheckRpc("", Status(StatusCode::UNIMPLEMENTED, "")); -} - -// Provide an explicit override of health checking service interface. -TEST_F(HealthServiceEnd2endTest, ExplicitlyOverride) { - EnableDefaultHealthCheckService(true); - EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); - std::unique_ptr override_service( - new CustomHealthCheckService(&health_check_service_impl_)); - HealthCheckServiceInterface* underlying_service = override_service.get(); - SetUpServer(false, true, std::move(override_service)); - HealthCheckServiceInterface* service = server_->GetHealthCheckService(); - EXPECT_TRUE(service == underlying_service); - - ResetStubs(); - - VerifyHealthCheckService(); -} +// TEST_F(HealthServiceEnd2endTest, DefaultHealthService) { +// EnableDefaultHealthCheckService(true); +// EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); +// SetUpServer(true, false, nullptr); +// VerifyHealthCheckService(); +// +// // The default service has a size limit of the service name. +// const grpc::string kTooLongServiceName(201, 'x'); +// SendHealthCheckRpc(kTooLongServiceName, +// Status(StatusCode::INVALID_ARGUMENT, "")); +// } +// +// void LoopCompletionQueue(ServerCompletionQueue* cq) { +// void* tag; +// bool ok; +// while (cq->Next(&tag, &ok)) { +// abort(); // Nothing should come out of the cq. +// } +// gpr_log(GPR_ERROR, "returning from thread"); +// } +// +// TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceAsync) { +// EnableDefaultHealthCheckService(true); +// EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); +// SetUpServer(false, false, nullptr); +// cq_thread_ = std::thread(LoopCompletionQueue, cq_.get()); +// VerifyHealthCheckService(); +// +// // The default service has a size limit of the service name. +// const grpc::string kTooLongServiceName(201, 'x'); +// SendHealthCheckRpc(kTooLongServiceName, +// Status(StatusCode::INVALID_ARGUMENT, "")); +// } +// +// // Provide an empty service to disable the default service. +// TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) { +// EnableDefaultHealthCheckService(true); +// EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); +// std::unique_ptr empty_service; +// SetUpServer(true, true, std::move(empty_service)); +// HealthCheckServiceInterface* service = server_->GetHealthCheckService(); +// EXPECT_TRUE(service == nullptr); +// +// ResetStubs(); +// +// SendHealthCheckRpc("", Status(StatusCode::UNIMPLEMENTED, "")); +// } +// +// // Provide an explicit override of health checking service interface. +// TEST_F(HealthServiceEnd2endTest, ExplicitlyOverride) { +// EnableDefaultHealthCheckService(true); +// EXPECT_TRUE(DefaultHealthCheckServiceEnabled()); +// std::unique_ptr override_service( +// new CustomHealthCheckService(&health_check_service_impl_)); +// HealthCheckServiceInterface* underlying_service = override_service.get(); +// SetUpServer(false, true, std::move(override_service)); +// HealthCheckServiceInterface* service = server_->GetHealthCheckService(); +// EXPECT_TRUE(service == underlying_service); +// +// ResetStubs(); +// +// VerifyHealthCheckService(); +// } } // namespace } // namespace testing