diff --git a/test/cpp/interop/BUILD b/test/cpp/interop/BUILD index 6e843380ccb..1d55b9fcc9f 100644 --- a/test/cpp/interop/BUILD +++ b/test/cpp/interop/BUILD @@ -340,3 +340,19 @@ grpc_cc_binary( "//test/cpp/util:test_util", ], ) + +grpc_cc_binary( + name = "xds_federation_client", + srcs = [ + "xds_federation_client.cc", + ], + external_deps = [ + "absl/flags:flag", + ], + language = "C++", + deps = [ + ":client_helper_lib", + "//:grpc++", + "//test/cpp/util:test_config", + ], +) diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index bbf53420299..0c4a8c45afc 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -300,14 +300,14 @@ int main(int argc, char** argv) { std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client); actions["channel_soak"] = std::bind( &grpc::testing::InteropClient::DoChannelSoakTest, &client, - absl::GetFlag(FLAGS_soak_iterations), + absl::GetFlag(FLAGS_server_host), absl::GetFlag(FLAGS_soak_iterations), absl::GetFlag(FLAGS_soak_max_failures), absl::GetFlag(FLAGS_soak_per_iteration_max_acceptable_latency_ms), absl::GetFlag(FLAGS_soak_min_time_ms_between_rpcs), absl::GetFlag(FLAGS_soak_overall_timeout_seconds)); actions["rpc_soak"] = std::bind( &grpc::testing::InteropClient::DoRpcSoakTest, &client, - absl::GetFlag(FLAGS_soak_iterations), + absl::GetFlag(FLAGS_server_host), absl::GetFlag(FLAGS_soak_iterations), absl::GetFlag(FLAGS_soak_max_failures), absl::GetFlag(FLAGS_soak_per_iteration_max_acceptable_latency_ms), absl::GetFlag(FLAGS_soak_min_time_ms_between_rpcs), diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index c9a2175815b..123c08a6258 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -1056,8 +1056,8 @@ InteropClient::PerformOneSoakTestIteration( } void InteropClient::PerformSoakTest( - const bool reset_channel_per_iteration, const int32_t soak_iterations, - const int32_t max_failures, + const std::string& server_uri, const bool reset_channel_per_iteration, + const int32_t soak_iterations, const int32_t max_failures, const int32_t max_acceptable_per_iteration_latency_ms, const int32_t min_time_ms_between_rpcs, const int32_t overall_timeout_seconds) { @@ -1086,12 +1086,16 @@ void InteropClient::PerformSoakTest( results.push_back(result); if (!success) { gpr_log(GPR_DEBUG, - "soak iteration: %d elapsed_ms: %d peer: %s failed: %s", i, - elapsed_ms, peer.c_str(), debug_string.c_str()); + "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s " + "failed: %s", + i, elapsed_ms, peer.c_str(), server_uri.c_str(), + debug_string.c_str()); total_failures++; } else { - gpr_log(GPR_DEBUG, "soak iteration: %d elapsed_ms: %d peer: %s succeeded", - i, elapsed_ms, peer.c_str()); + gpr_log( + GPR_DEBUG, + "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded", + i, elapsed_ms, peer.c_str(), server_uri.c_str()); } grpc_histogram_add(latencies_ms_histogram, std::get<1>(result)); iterations_ran++; @@ -1106,7 +1110,8 @@ void InteropClient::PerformSoakTest( if (iterations_ran < soak_iterations) { gpr_log( GPR_ERROR, - "soak test consumed all %d seconds of time and quit early, only " + "(server_uri: %s) soak test consumed all %d seconds of time and quit " + "early, only " "having ran %d out of desired %d iterations. " "total_failures: %d. " "max_failures_threshold: %d. " @@ -1116,58 +1121,62 @@ void InteropClient::PerformSoakTest( "Some or all of the iterations that did run were unexpectedly slow. " "See breakdown above for which iterations succeeded, failed, and " "why for more info.", - overall_timeout_seconds, iterations_ran, soak_iterations, - total_failures, max_failures, latency_ms_median, latency_ms_90th, - latency_ms_worst); + server_uri.c_str(), overall_timeout_seconds, iterations_ran, + soak_iterations, total_failures, max_failures, latency_ms_median, + latency_ms_90th, latency_ms_worst); GPR_ASSERT(0); } else if (total_failures > max_failures) { gpr_log(GPR_ERROR, - "soak test ran: %d iterations. total_failures: %d exceeds " + "(server_uri: %s) soak test ran: %d iterations. total_failures: %d " + "exceeds " "max_failures_threshold: %d. " "median_soak_iteration_latency: %lf ms. " "90th_soak_iteration_latency: %lf ms. " "worst_soak_iteration_latency: %lf ms. " "See breakdown above for which iterations succeeded, failed, and " "why for more info.", - soak_iterations, total_failures, max_failures, latency_ms_median, - latency_ms_90th, latency_ms_worst); + server_uri.c_str(), soak_iterations, total_failures, max_failures, + latency_ms_median, latency_ms_90th, latency_ms_worst); GPR_ASSERT(0); } else { gpr_log(GPR_INFO, - "soak test ran: %d iterations. total_failures: %d is within " + "(server_uri: %s) soak test ran: %d iterations. total_failures: %d " + "is within " "max_failures_threshold: %d. " "median_soak_iteration_latency: %lf ms. " "90th_soak_iteration_latency: %lf ms. " "worst_soak_iteration_latency: %lf ms. " "See breakdown above for which iterations succeeded, failed, and " "why for more info.", - soak_iterations, total_failures, max_failures, latency_ms_median, - latency_ms_90th, latency_ms_worst); + server_uri.c_str(), soak_iterations, total_failures, max_failures, + latency_ms_median, latency_ms_90th, latency_ms_worst); } } bool InteropClient::DoRpcSoakTest( - int32_t soak_iterations, int32_t max_failures, - int64_t max_acceptable_per_iteration_latency_ms, + const std::string& server_uri, int32_t soak_iterations, + int32_t max_failures, int64_t max_acceptable_per_iteration_latency_ms, int32_t soak_min_time_ms_between_rpcs, int32_t overall_timeout_seconds) { gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations); GPR_ASSERT(soak_iterations > 0); - PerformSoakTest(false /* reset channel per iteration */, soak_iterations, - max_failures, max_acceptable_per_iteration_latency_ms, + PerformSoakTest(server_uri, false /* reset channel per iteration */, + soak_iterations, max_failures, + max_acceptable_per_iteration_latency_ms, soak_min_time_ms_between_rpcs, overall_timeout_seconds); gpr_log(GPR_DEBUG, "rpc_soak test done."); return true; } bool InteropClient::DoChannelSoakTest( - int32_t soak_iterations, int32_t max_failures, - int64_t max_acceptable_per_iteration_latency_ms, + const std::string& server_uri, int32_t soak_iterations, + int32_t max_failures, int64_t max_acceptable_per_iteration_latency_ms, int32_t soak_min_time_ms_between_rpcs, int32_t overall_timeout_seconds) { gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...", soak_iterations); GPR_ASSERT(soak_iterations > 0); - PerformSoakTest(true /* reset channel per iteration */, soak_iterations, - max_failures, max_acceptable_per_iteration_latency_ms, + PerformSoakTest(server_uri, true /* reset channel per iteration */, + soak_iterations, max_failures, + max_acceptable_per_iteration_latency_ms, soak_min_time_ms_between_rpcs, overall_timeout_seconds); gpr_log(GPR_DEBUG, "channel_soak test done."); return true; diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 132c6aba148..27c276290a7 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -78,11 +78,13 @@ class InteropClient { // not implemented cross-language. They are considered experimental for now, // but at some point in the future, might be codified and implemented in all // languages - bool DoChannelSoakTest(int32_t soak_iterations, int32_t max_failures, + bool DoChannelSoakTest(const std::string& server_uri, int32_t soak_iterations, + int32_t max_failures, int64_t max_acceptable_per_iteration_latency_ms, int32_t soak_min_time_ms_between_rpcs, int32_t overall_timeout_seconds); - bool DoRpcSoakTest(int32_t soak_iterations, int32_t max_failures, + bool DoRpcSoakTest(const std::string& server_uri, int32_t soak_iterations, + int32_t max_failures, int64_t max_acceptable_per_iteration_latency_ms, int32_t soak_min_time_ms_between_rpcs, int32_t overall_timeout_seconds); @@ -141,7 +143,8 @@ class InteropClient { const bool reset_channel, const int32_t max_acceptable_per_iteration_latency_ms); - void PerformSoakTest(const bool reset_channel_per_iteration, + void PerformSoakTest(const std::string& server_uri, + const bool reset_channel_per_iteration, const int32_t soak_iterations, const int32_t max_failures, const int32_t max_acceptable_per_iteration_latency_ms, diff --git a/test/cpp/interop/xds_federation_client.cc b/test/cpp/interop/xds_federation_client.cc new file mode 100644 index 00000000000..9af58c2fcc3 --- /dev/null +++ b/test/cpp/interop/xds_federation_client.cc @@ -0,0 +1,122 @@ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include +#include + +#include "absl/flags/flag.h" +#include "absl/strings/str_split.h" + +#include +#include +#include + +#include "src/core/lib/gpr/string.h" +#include "test/core/util/test_config.h" +#include "test/cpp/interop/client_helper.h" +#include "test/cpp/interop/interop_client.h" +#include "test/cpp/util/create_test_channel.h" +#include "test/cpp/util/test_config.h" + +ABSL_FLAG(std::string, server_uris, "", + "Comma-separated list of sever URIs to make RPCs to"); +ABSL_FLAG(std::string, credentials_types, "", + "Comma-separated list of credentials, each entry is used for the " + "server of the corresponding index in server_uris. Supported values: " + "compute_engine_channel_creds, INSECURE_CREDENTIALS"); +ABSL_FLAG(int32_t, soak_iterations, 10, + "The number of iterations to use for the two soak tests: rpc_soak " + "and channel_soak"); +ABSL_FLAG(int32_t, soak_max_failures, 0, + "The number of iterations in soak tests that are allowed to fail " + "(either due to non-OK status code or exceeding the per-iteration " + "max acceptable latency)."); +ABSL_FLAG(int32_t, soak_per_iteration_max_acceptable_latency_ms, 1000, + "The number of milliseconds a single iteration in the two soak tests " + "(rpc_soak and channel_soak) should take."); +ABSL_FLAG( + int32_t, soak_overall_timeout_seconds, 10, + "The overall number of seconds after which a soak test should stop and " + "fail, if the desired number of iterations have not yet completed."); +ABSL_FLAG(int32_t, soak_min_time_ms_between_rpcs, 0, + "The minimum time in milliseconds between consecutive RPCs in a soak " + "test (rpc_soak or channel_soak), useful for limiting QPS"); +ABSL_FLAG(std::string, test_case, "rpc_soak", + "Configure different test cases. Valid options are: " + "rpc_soak: sends --soak_iterations large_unary RPCs; " + "channel_soak: sends --soak_iterations RPCs, rebuilding the channel " + "each time"); + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(&argc, argv); + grpc::testing::InitTest(&argc, &argv, true); + gpr_log(GPR_INFO, "Testing these cases: %s", + absl::GetFlag(FLAGS_test_case).c_str()); + std::string test_case = absl::GetFlag(FLAGS_test_case); + // validate flags + std::vector uris = + absl::StrSplit(absl::GetFlag(FLAGS_server_uris), ','); + std::vector creds = + absl::StrSplit(absl::GetFlag(FLAGS_credentials_types), ','); + if (uris.size() != creds.size()) { + gpr_log(GPR_ERROR, + "Number of entries in --server_uris %ld != number of entries in " + "--credentials_types %ld", + uris.size(), creds.size()); + GPR_ASSERT(0); + } + if (uris.empty()) { + gpr_log(GPR_ERROR, "--server_uris has zero entries"); + GPR_ASSERT(0); + } + // construct and start clients + std::vector threads; + for (size_t i = 0; i < uris.size(); i++) { + threads.push_back(std::thread([uris, creds, i, test_case]() { + auto channel_creation_func = [uris, creds, i]() { + return grpc::CreateTestChannel(uris[i], creds[i], + nullptr /* call creds */); + }; + grpc::testing::InteropClient client(channel_creation_func, true, false); + if (test_case == "rpc_soak") { + client.DoRpcSoakTest( + uris[i], absl::GetFlag(FLAGS_soak_iterations), + absl::GetFlag(FLAGS_soak_max_failures), + absl::GetFlag(FLAGS_soak_per_iteration_max_acceptable_latency_ms), + absl::GetFlag(FLAGS_soak_min_time_ms_between_rpcs), + absl::GetFlag(FLAGS_soak_overall_timeout_seconds)); + } else if (test_case == "channel_soak") { + client.DoChannelSoakTest( + uris[i], absl::GetFlag(FLAGS_soak_iterations), + absl::GetFlag(FLAGS_soak_max_failures), + absl::GetFlag(FLAGS_soak_per_iteration_max_acceptable_latency_ms), + absl::GetFlag(FLAGS_soak_min_time_ms_between_rpcs), + absl::GetFlag(FLAGS_soak_overall_timeout_seconds)); + } else { + gpr_log(GPR_ERROR, + "Invalid test case, must be either rpc_soak or channel_soak"); + GPR_ASSERT(0); + } + })); + } + for (auto& thd : threads) { + thd.join(); + } + gpr_log(GPR_INFO, "All clients done!"); + return 0; +}