From 16b67ae312df6b99b25307dbe4a18b0a461063b0 Mon Sep 17 00:00:00 2001 From: Eugene Ostroukhov Date: Mon, 21 Aug 2023 16:23:38 -0700 Subject: [PATCH] [PSM Interop] Add "hook service" (#34027) --- CMakeLists.txt | 80 +++++++ build_autogenerated.yaml | 35 ++++ src/proto/grpc/testing/messages.proto | 22 +- src/proto/grpc/testing/test.proto | 9 + test/cpp/interop/BUILD | 21 +- test/cpp/interop/pre_stop_hook_server.cc | 198 ++++++++++++++++++ test/cpp/interop/pre_stop_hook_server.h | 62 ++++++ test/cpp/interop/pre_stop_hook_server_test.cc | 176 ++++++++++++++++ test/cpp/interop/xds_interop_server_lib.cc | 31 ++- tools/run_tests/generated/tests.json | 24 +++ 10 files changed, 653 insertions(+), 5 deletions(-) create mode 100644 test/cpp/interop/pre_stop_hook_server.cc create mode 100644 test/cpp/interop/pre_stop_hook_server.h create mode 100644 test/cpp/interop/pre_stop_hook_server_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index e73ed867506..57c5765eacc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1149,6 +1149,7 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx posix_event_engine_test) endif() + add_dependencies(buildtests_cxx pre_stop_hook_server_test) add_dependencies(buildtests_cxx prioritized_race_test) add_dependencies(buildtests_cxx promise_endpoint_test) add_dependencies(buildtests_cxx promise_factory_test) @@ -17766,6 +17767,83 @@ endif() endif() if(gRPC_BUILD_TESTS) +add_executable(pre_stop_hook_server_test + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/health/v1/health.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/health/v1/health.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/health/v1/health.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/health/v1/health.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/istio_echo.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/istio_echo.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/istio_echo.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/istio_echo.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_dump.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_dump.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_dump.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_dump.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/csds.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/csds.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/csds.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/csds.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h + src/cpp/server/admin/admin_services.cc + src/cpp/server/csds/csds.cc + test/cpp/end2end/test_health_check_service_impl.cc + test/cpp/interop/pre_stop_hook_server.cc + test/cpp/interop/pre_stop_hook_server_test.cc + test/cpp/interop/xds_interop_server_lib.cc +) +target_compile_features(pre_stop_hook_server_test PUBLIC cxx_std_14) +target_include_directories(pre_stop_hook_server_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(pre_stop_hook_server_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + grpc++_reflection + grpcpp_channelz + grpc_test_util + grpc++_test_config +) + + +endif() +if(gRPC_BUILD_TESTS) + add_executable(prioritized_race_test test/core/promise/prioritized_race_test.cc ) @@ -27492,6 +27570,7 @@ add_executable(xds_interop_server src/cpp/server/admin/admin_services.cc src/cpp/server/csds/csds.cc test/cpp/end2end/test_health_check_service_impl.cc + test/cpp/interop/pre_stop_hook_server.cc test/cpp/interop/xds_interop_server.cc test/cpp/interop/xds_interop_server_lib.cc ) @@ -27563,6 +27642,7 @@ add_executable(xds_interop_server_test src/cpp/server/admin/admin_services.cc src/cpp/server/csds/csds.cc test/cpp/end2end/test_health_check_service_impl.cc + test/cpp/interop/pre_stop_hook_server.cc test/cpp/interop/xds_interop_server_lib.cc test/cpp/interop/xds_interop_server_test.cc ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 73c018b8fcd..f74c2a16850 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -12223,6 +12223,37 @@ targets: platforms: - linux - posix +- name: pre_stop_hook_server_test + gtest: true + build: test + language: c++ + headers: + - src/cpp/server/csds/csds.h + - test/cpp/end2end/test_health_check_service_impl.h + - test/cpp/interop/pre_stop_hook_server.h + - test/cpp/interop/xds_interop_server_lib.h + src: + - src/proto/grpc/health/v1/health.proto + - src/proto/grpc/testing/empty.proto + - src/proto/grpc/testing/istio_echo.proto + - src/proto/grpc/testing/messages.proto + - src/proto/grpc/testing/test.proto + - src/proto/grpc/testing/xds/v3/base.proto + - src/proto/grpc/testing/xds/v3/config_dump.proto + - src/proto/grpc/testing/xds/v3/csds.proto + - src/proto/grpc/testing/xds/v3/percent.proto + - src/cpp/server/admin/admin_services.cc + - src/cpp/server/csds/csds.cc + - test/cpp/end2end/test_health_check_service_impl.cc + - test/cpp/interop/pre_stop_hook_server.cc + - test/cpp/interop/pre_stop_hook_server_test.cc + - test/cpp/interop/xds_interop_server_lib.cc + deps: + - gtest + - grpc++_reflection + - grpcpp_channelz + - grpc_test_util + - grpc++_test_config - name: prioritized_race_test gtest: true build: test @@ -17553,6 +17584,7 @@ targets: headers: - src/cpp/server/csds/csds.h - test/cpp/end2end/test_health_check_service_impl.h + - test/cpp/interop/pre_stop_hook_server.h - test/cpp/interop/xds_interop_server_lib.h src: - src/proto/grpc/health/v1/health.proto @@ -17566,6 +17598,7 @@ targets: - src/cpp/server/admin/admin_services.cc - src/cpp/server/csds/csds.cc - test/cpp/end2end/test_health_check_service_impl.cc + - test/cpp/interop/pre_stop_hook_server.cc - test/cpp/interop/xds_interop_server.cc - test/cpp/interop/xds_interop_server_lib.cc deps: @@ -17580,6 +17613,7 @@ targets: headers: - src/cpp/server/csds/csds.h - test/cpp/end2end/test_health_check_service_impl.h + - test/cpp/interop/pre_stop_hook_server.h - test/cpp/interop/xds_interop_server_lib.h src: - src/proto/grpc/health/v1/health.proto @@ -17594,6 +17628,7 @@ targets: - src/cpp/server/admin/admin_services.cc - src/cpp/server/csds/csds.cc - test/cpp/end2end/test_health_check_service_impl.cc + - test/cpp/interop/pre_stop_hook_server.cc - test/cpp/interop/xds_interop_server_lib.cc - test/cpp/interop/xds_interop_server_test.cc deps: diff --git a/src/proto/grpc/testing/messages.proto b/src/proto/grpc/testing/messages.proto index d948ef6a929..3e26bdb5f56 100644 --- a/src/proto/grpc/testing/messages.proto +++ b/src/proto/grpc/testing/messages.proto @@ -219,7 +219,7 @@ message LoadBalancerStatsResponse { message RpcMetadata { // metadata values for each rpc for the keys specified in // LoadBalancerStatsRequest.metadata_keys. - // metadata keys and values are returned exactly as was recieved + // metadata keys and values are returned exactly as was received // from the server. repeated MetadataEntry metadata = 1; } @@ -309,3 +309,23 @@ message TestOrcaReport { map request_cost = 3; map utilization = 4; } + +enum HookRequestCommand { + // Start the HTTP endpoint + START = 0; + // Stop + STOP = 1; + // Return from HTTP GET/POST + RETURN = 2; +} + +message HookRequest { + HookRequestCommand command = 1; + int32 grpc_code_to_return = 2; + string grpc_status_description = 3; + // Server port to listen to + int32 server_port = 4; +} + +message HookResponse { +} diff --git a/src/proto/grpc/testing/test.proto b/src/proto/grpc/testing/test.proto index 5805a41da99..23a7707191b 100644 --- a/src/proto/grpc/testing/test.proto +++ b/src/proto/grpc/testing/test.proto @@ -23,6 +23,8 @@ import "src/proto/grpc/testing/messages.proto"; package grpc.testing; +option java_package = "io.grpc.testing.integration"; + // A simple service to test the various types of RPCs and experiment with // performance with various types of payload. service TestService { @@ -89,10 +91,17 @@ service LoadBalancerStatsService { returns (LoadBalancerAccumulatedStatsResponse) {} } +// Hook service that may be started on request by calling XdsUpdateHealthService +// with HookRequestCommand::START +service HookService { + rpc Hook(grpc.testing.Empty) returns (grpc.testing.Empty); +} + // A service to remotely control health status of an xDS test server. service XdsUpdateHealthService { rpc SetServing(grpc.testing.Empty) returns (grpc.testing.Empty); rpc SetNotServing(grpc.testing.Empty) returns (grpc.testing.Empty); + rpc SendHookRequest(HookRequest) returns (HookResponse); } // A service to dynamically update the configuration of an xDS test client. diff --git a/test/cpp/interop/BUILD b/test/cpp/interop/BUILD index c1f31ad2a6b..a3bc063b1a4 100644 --- a/test/cpp/interop/BUILD +++ b/test/cpp/interop/BUILD @@ -287,9 +287,13 @@ grpc_cc_library( name = "xds_interop_server_lib", testonly = True, srcs = [ + "pre_stop_hook_server.cc", "xds_interop_server_lib.cc", ], - hdrs = ["xds_interop_server_lib.h"], + hdrs = [ + "pre_stop_hook_server.h", + "xds_interop_server_lib.h", + ], deps = [ "//:grpc++", "//:grpc++_reflection", @@ -319,6 +323,21 @@ grpc_cc_binary( ], ) +grpc_cc_test( + name = "pre_stop_hook_server_test", + srcs = [ + "pre_stop_hook_server_test.cc", + ], + external_deps = ["gtest"], + deps = [ + ":xds_interop_server_lib", + "//:grpc++", + "//src/proto/grpc/testing:istio_echo_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_config", + ], +) + grpc_cc_test( name = "xds_interop_server_test", srcs = [ diff --git a/test/cpp/interop/pre_stop_hook_server.cc b/test/cpp/interop/pre_stop_hook_server.cc new file mode 100644 index 00000000000..6ddaceda5db --- /dev/null +++ b/test/cpp/interop/pre_stop_hook_server.cc @@ -0,0 +1,198 @@ +// +// +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include "test/cpp/interop/pre_stop_hook_server.h" + +#include + +#include "absl/strings/str_format.h" + +#include + +#include "src/proto/grpc/testing/test.grpc.pb.h" + +namespace grpc { +namespace testing { +namespace { + +class HookServiceImpl final : public HookService::CallbackService { + public: + ServerUnaryReactor* Hook(CallbackServerContext* context, + const Empty* /* request */, + Empty* /* reply */) override { + grpc_core::MutexLock lock(&mu_); + auto reactor = context->DefaultReactor(); + if (pending_status_) { + reactor->Finish(std::move(*pending_status_)); + pending_status_ = absl::nullopt; + } else if (done_) { + reactor->Finish(Status(StatusCode::ABORTED, "Shutting down")); + } else { + pending_requests_.push_back(reactor); + } + request_var_.SignalAll(); + return reactor; + } + + void SetReturnStatus(const Status& status) { + grpc_core::MutexLock lock(&mu_); + if (pending_requests_.empty()) { + pending_status_ = status; + } else { + auto reactor = pending_requests_.begin(); + (*reactor)->Finish(status); + pending_requests_.erase(reactor); + } + request_var_.SignalAll(); + } + + bool TestOnlyExpectRequests(size_t expected_requests_count, + const absl::Duration& timeout) { + grpc_core::MutexLock lock(&mu_); + auto deadline = absl::Now() + timeout; + while (pending_requests_.size() < expected_requests_count && + !request_var_.WaitWithDeadline(&mu_, deadline)) { + } + return pending_requests_.size() >= expected_requests_count; + } + + void Stop() { + grpc_core::MutexLock lock(&mu_); + for (auto request : pending_requests_) { + request->Finish(Status(StatusCode::ABORTED, "Shutting down")); + } + pending_requests_.clear(); + } + + private: + grpc_core::Mutex mu_; + grpc_core::CondVar request_var_ ABSL_GUARDED_BY(&mu_); + absl::optional pending_status_ ABSL_GUARDED_BY(&mu_); + std::vector pending_requests_ ABSL_GUARDED_BY(&mu_); + bool done_ ABSL_GUARDED_BY(&mu_) = false; +}; + +enum class State { kNew, kWaiting, kDone, kShuttingDown }; + +std::unique_ptr BuildHookServer(HookServiceImpl* service, int port) { + ServerBuilder builder; + builder.AddListeningPort(absl::StrFormat("0.0.0.0:%d", port), + grpc::InsecureServerCredentials()); + builder.RegisterService(service); + return builder.BuildAndStart(); +} + +} // namespace + +class PreStopHookServer { + public: + explicit PreStopHookServer(int port, const absl::Duration& startup_timeout) + : server_(BuildHookServer(&hook_service_, port)), + server_thread_(PreStopHookServer::ServerThread, this) { + WaitForState(State::kWaiting, startup_timeout); + } + + ~PreStopHookServer() { + hook_service_.Stop(); + SetState(State::kShuttingDown); + server_->Shutdown(); + server_thread_.join(); + } + + State GetState() { + grpc_core::MutexLock lock(&mu_); + return state_; + } + + void SetState(State state) { + grpc_core::MutexLock lock(&mu_); + state_ = state; + condition_.SignalAll(); + } + + void SetReturnStatus(const Status& status) { + hook_service_.SetReturnStatus(status); + } + + bool TestOnlyExpectRequests(size_t expected_requests_count, + absl::Duration timeout) { + return hook_service_.TestOnlyExpectRequests(expected_requests_count, + timeout); + } + + private: + bool WaitForState(State state, const absl::Duration& timeout) { + grpc_core::MutexLock lock(&mu_); + auto deadline = absl::Now() + timeout; + while (state_ != state && !condition_.WaitWithDeadline(&mu_, deadline)) { + } + return state_ == state; + } + + static void ServerThread(PreStopHookServer* server) { + server->SetState(State::kWaiting); + server->server_->Wait(); + server->SetState(State::kDone); + } + + HookServiceImpl hook_service_; + grpc_core::Mutex mu_; + grpc_core::CondVar condition_ ABSL_GUARDED_BY(mu_); + State state_ ABSL_GUARDED_BY(mu_) = State::kNew; + std::unique_ptr server_; + std::thread server_thread_; +}; + +Status PreStopHookServerManager::Start(int port, size_t timeout_s) { + if (server_) { + return Status(StatusCode::ALREADY_EXISTS, + "Pre hook server is already running"); + } + server_ = std::unique_ptr( + new PreStopHookServer(port, absl::Seconds(timeout_s)), + PreStopHookServerDeleter()); + return server_->GetState() == State::kWaiting + ? Status::OK + : Status(StatusCode::DEADLINE_EXCEEDED, "Server have not started"); +} + +Status PreStopHookServerManager::Stop() { + if (!server_) { + return Status(StatusCode::UNAVAILABLE, "Pre hook server is not running"); + } + server_.reset(); + return Status::OK; +} + +void PreStopHookServerManager::Return(StatusCode code, + absl::string_view description) { + server_->SetReturnStatus(Status(code, std::string(description))); +} + +bool PreStopHookServerManager::TestOnlyExpectRequests( + size_t expected_requests_count, const absl::Duration& timeout) { + return server_->TestOnlyExpectRequests(expected_requests_count, timeout); +} + +void PreStopHookServerManager::PreStopHookServerDeleter::operator()( + PreStopHookServer* server) { + delete server; +} + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/interop/pre_stop_hook_server.h b/test/cpp/interop/pre_stop_hook_server.h new file mode 100644 index 00000000000..570e264c555 --- /dev/null +++ b/test/cpp/interop/pre_stop_hook_server.h @@ -0,0 +1,62 @@ +// +// +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPC_TEST_CPP_INTEROP_PRE_STOP_HOOK_SERVER_H +#define GRPC_TEST_CPP_INTEROP_PRE_STOP_HOOK_SERVER_H + +#include + +#include + +#include + +#include "src/core/lib/config/core_configuration.h" + +namespace grpc { +namespace testing { + +// Implementation of the pre-stop hook server. An instance is created to start +// a server and destroyed to stop one. +class PreStopHookServer; + +// Interface for interacting with PreStopHookServer. Provides operations +// required by the protocol, such as start, stop and return from the call. +class PreStopHookServerManager { + public: + Status Start(int port, size_t timeout_s); + Status Stop(); + void Return(StatusCode code, absl::string_view description); + // Suspends the thread until there are pending requests. Returns false + // if the necessary number of requests have not been received before the + // timeout. + bool TestOnlyExpectRequests( + size_t expected_requests_count, + const absl::Duration& timeout = absl::Seconds(15)); + + private: + // Custom deleter so we don't have to include PreStopHookServer in this header + struct PreStopHookServerDeleter { + void operator()(PreStopHookServer* server); + }; + + std::unique_ptr server_; +}; + +} // namespace testing +} // namespace grpc +#endif // GRPC_TEST_CPP_INTEROP_PRE_STOP_HOOK_SERVER_H diff --git a/test/cpp/interop/pre_stop_hook_server_test.cc b/test/cpp/interop/pre_stop_hook_server_test.cc new file mode 100644 index 00000000000..a2fcc562603 --- /dev/null +++ b/test/cpp/interop/pre_stop_hook_server_test.cc @@ -0,0 +1,176 @@ +// +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "test/cpp/interop/pre_stop_hook_server.h" + +#include +#include + +#include +#include + +#include "absl/strings/str_format.h" + +#include +#include + +#include "src/proto/grpc/testing/test.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +namespace grpc { +namespace testing { +namespace { + +struct CallInfo { + public: + ClientContext context; + Empty request; + Empty response; + + absl::optional WaitForStatus( + absl::Duration timeout = absl::Seconds(1)) { + grpc_core::MutexLock lock(&mu); + if (!status_.has_value()) { + cv.WaitWithTimeout(&mu, timeout); + } + return status_; + } + + void SetStatus(const Status& status) { + grpc_core::MutexLock lock(&mu); + status_ = status; + cv.SignalAll(); + } + + private: + grpc_core::Mutex mu; + grpc_core::CondVar cv; + absl::optional status_; +}; + +TEST(PreStopHookServer, StartDoRequestStop) { + int port = grpc_pick_unused_port_or_die(); + PreStopHookServerManager server; + Status start_status = server.Start(port, 15); + ASSERT_TRUE(start_status.ok()) << start_status.error_message(); + auto channel = CreateChannel(absl::StrFormat("127.0.0.1:%d", port), + InsecureChannelCredentials()); + ASSERT_TRUE(channel); + CallInfo info; + HookService::Stub stub(std::move(channel)); + stub.async()->Hook(&info.context, &info.request, &info.response, + [&info](Status status) { info.SetStatus(status); }); + ASSERT_EQ(server.TestOnlyExpectRequests(1), 1); + server.Return(StatusCode::INTERNAL, "Just a test"); + auto status = info.WaitForStatus(); + ASSERT_TRUE(status.has_value()); + EXPECT_EQ(status->error_code(), StatusCode::INTERNAL); + EXPECT_EQ(status->error_message(), "Just a test"); +} + +TEST(PreStopHookServer, StartServerWhileAlreadyRunning) { + int port = grpc_pick_unused_port_or_die(); + PreStopHookServerManager server; + Status status = server.Start(port, 15); + ASSERT_TRUE(status.ok()) << status.error_message(); + status = server.Start(port, 15); + ASSERT_EQ(status.error_code(), StatusCode::ALREADY_EXISTS) + << status.error_message(); +} + +TEST(PreStopHookServer, StopServerWhileRequestPending) { + int port = grpc_pick_unused_port_or_die(); + PreStopHookServerManager server; + Status start_status = server.Start(port, 15); + ASSERT_TRUE(start_status.ok()) << start_status.error_message(); + auto channel = CreateChannel(absl::StrFormat("127.0.0.1:%d", port), + InsecureChannelCredentials()); + ASSERT_TRUE(channel); + CallInfo info; + HookService::Stub stub(std::move(channel)); + stub.async()->Hook(&info.context, &info.request, &info.response, + [&info](Status status) { info.SetStatus(status); }); + ASSERT_EQ(server.TestOnlyExpectRequests(1), 1); + ASSERT_TRUE(server.Stop().ok()); + auto status = info.WaitForStatus(); + ASSERT_TRUE(status.has_value()); + EXPECT_EQ(status->error_code(), StatusCode::ABORTED); +} + +TEST(PreStopHookServer, RespondToMultiplePendingRequests) { + std::array info; + int port = grpc_pick_unused_port_or_die(); + PreStopHookServerManager server; + Status start_status = server.Start(port, 15); + ASSERT_TRUE(start_status.ok()) << start_status.error_message(); + auto channel = CreateChannel(absl::StrFormat("127.0.0.1:%d", port), + InsecureChannelCredentials()); + ASSERT_TRUE(channel); + HookService::Stub stub(std::move(channel)); + stub.async()->Hook(&info[0].context, &info[0].request, &info[0].response, + [&info](Status status) { info[0].SetStatus(status); }); + ASSERT_EQ(server.TestOnlyExpectRequests(1), 1); + stub.async()->Hook(&info[1].context, &info[1].request, &info[1].response, + [&info](Status status) { info[1].SetStatus(status); }); + server.TestOnlyExpectRequests(2); + server.Return(StatusCode::INTERNAL, "Just a test"); + auto status = info[0].WaitForStatus(); + ASSERT_TRUE(status.has_value()); + EXPECT_EQ(status->error_code(), StatusCode::INTERNAL); + EXPECT_EQ(status->error_message(), "Just a test"); + status = info[1].WaitForStatus(); + EXPECT_FALSE(status.has_value()); +} + +TEST(PreStopHookServer, StopServerThatNotStarted) { + PreStopHookServerManager server; + Status status = server.Stop(); + EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE) + << status.error_message(); +} + +TEST(PreStopHookServer, SetStatusBeforeRequestReceived) { + int port = grpc_pick_unused_port_or_die(); + PreStopHookServerManager server; + Status start_status = server.Start(port, 15); + server.Return(StatusCode::INTERNAL, "Just a test"); + ASSERT_TRUE(start_status.ok()) << start_status.error_message(); + auto channel = CreateChannel(absl::StrFormat("127.0.0.1:%d", port), + InsecureChannelCredentials()); + ASSERT_TRUE(channel); + HookService::Stub stub(std::move(channel)); + CallInfo info; + stub.async()->Hook(&info.context, &info.request, &info.response, + [&info](Status status) { info.SetStatus(status); }); + auto status = info.WaitForStatus(); + ASSERT_TRUE(status.has_value()); + EXPECT_EQ(status->error_code(), StatusCode::INTERNAL); + EXPECT_EQ(status->error_message(), "Just a test"); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/cpp/interop/xds_interop_server_lib.cc b/test/cpp/interop/xds_interop_server_lib.cc index 310a5d44948..358b74b397a 100644 --- a/test/cpp/interop/xds_interop_server_lib.cc +++ b/test/cpp/interop/xds_interop_server_lib.cc @@ -41,6 +41,7 @@ #include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" #include "test/cpp/end2end/test_health_check_service_impl.h" +#include "test/cpp/interop/pre_stop_hook_server.h" namespace grpc { namespace testing { @@ -113,8 +114,10 @@ class TestServiceImpl : public TestService::Service { class XdsUpdateHealthServiceImpl : public XdsUpdateHealthService::Service { public: explicit XdsUpdateHealthServiceImpl( - HealthCheckServiceImpl* health_check_service) - : health_check_service_(health_check_service) {} + HealthCheckServiceImpl* health_check_service, + std::unique_ptr pre_stop_hook_server) + : health_check_service_(health_check_service), + pre_stop_hook_server_(std::move(pre_stop_hook_server)) {} Status SetServing(ServerContext* /* context */, const Empty* /* request */, Empty* /* response */) override { @@ -130,8 +133,29 @@ class XdsUpdateHealthServiceImpl : public XdsUpdateHealthService::Service { return Status::OK; } + Status SendHookRequest(ServerContext* /* context */, + const HookRequest* request, + HookResponse* /* response */) override { + switch (request->command()) { + case HookRequestCommand::START: + return pre_stop_hook_server_->Start(request->server_port(), 30 /* s */); + case HookRequestCommand::STOP: + return pre_stop_hook_server_->Stop(); + case HookRequestCommand::RETURN: + pre_stop_hook_server_->Return( + static_cast(request->grpc_code_to_return()), + request->grpc_status_description()); + return Status::OK; + default: + return Status( + StatusCode::INVALID_ARGUMENT, + absl::StrFormat("Invalid command %d", request->command())); + } + } + private: HealthCheckServiceImpl* const health_check_service_; + std::unique_ptr pre_stop_hook_server_; }; } // namespace @@ -189,7 +213,8 @@ void RunServer(bool secure_mode, const int port, const int maintenance_port, health_check_service.SetStatus( "grpc.testing.XdsUpdateHealthService", grpc::health::v1::HealthCheckResponse::SERVING); - XdsUpdateHealthServiceImpl update_health_service(&health_check_service); + XdsUpdateHealthServiceImpl update_health_service( + &health_check_service, std::make_unique()); grpc::reflection::InitProtoReflectionServerBuilderPlugin(); ServerBuilder builder; diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index f25c9de92a5..05be005caf8 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -6639,6 +6639,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "pre_stop_hook_server_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,