@ -16,12 +16,15 @@ |
#include <stdint.h> |
#include <stdio.h> |
#include <string.h> |
#include <algorithm> |
#include <iterator> |
#include <limits> |
#include <map> |
#include <memory> |
#include <string> |
#include <utility> |
#include <vector> |
@ -32,15 +35,26 @@ |
#include "absl/strings/str_cat.h" |
#include "absl/strings/str_split.h" |
#include "absl/strings/string_view.h" |
#include "google/protobuf/wrappers.pb.h" |
#include <grpc/grpc.h> |
#include <grpc/support/log.h> |
#include <grpc/support/time.h> |
#include <grpcpp/security/server_credentials.h> |
#include <grpcpp/server.h> |
#include <grpcpp/server_builder.h> |
#include "src/core/lib/config/config_vars.h" |
#include "src/core/lib/gpr/subprocess.h" |
#include "src/core/lib/gprpp/host_port.h" |
#include "src/core/lib/gprpp/env.h" |
#include "src/proto/grpc/testing/xds/v3/cluster.pb.h" |
#include "test/core/util/port.h" |
#include "test/core/util/resolve_localhost_ip46.h" |
#include "test/core/util/test_config.h" |
#include "test/cpp/end2end/xds/xds_server.h" |
#include "test/cpp/end2end/xds/xds_utils.h" |
using grpc::testing::XdsResourceUtils; |
ABSL_FLAG(std::string, benchmark_names, "call,channel", |
"Which benchmark to run"); // Default all benchmarks in order to
@ -51,6 +65,9 @@ ABSL_FLAG(std::string, scenario_config, "insecure", |
"secure (Use SSL credentials on server)"); |
ABSL_FLAG(bool, memory_profiling, false, |
"Run memory profiling"); // TODO (chennancy) Connect this flag
ABSL_FLAG(bool, use_xds, false, "Use xDS"); |
// TODO(roth, ctiller): Add support for multiple addresses per channel.
class Subprocess { |
public: |
@ -74,28 +91,38 @@ class Subprocess { |
}; |
// per-call memory usage benchmark
int RunCallBenchmark(char* root, std::vector<std::string> server_scenario_flags, |
int RunCallBenchmark(int port, char* root, |
std::vector<std::string> server_scenario_flags, |
std::vector<std::string> client_scenario_flags) { |
int status; |
int port = grpc_pick_unused_port_or_die(); |
// start the server
gpr_log(GPR_INFO, "starting server"); |
std::vector<std::string> server_flags = { |
absl::StrCat(root, "/memory_usage_server", |
gpr_subprocess_binary_extension()), |
"--grpc_experiments", |
std::string(grpc_core::ConfigVars::Get().Experiments()), "--bind", |
grpc_core::JoinHostPort("::", port)}; |
grpc_core::LocalIpAndPort(port)}; |
if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds"); |
// Add scenario-specific server flags to the end of the server_flags
absl::c_move(server_scenario_flags, std::back_inserter(server_flags)); |
Subprocess svr(server_flags); |
gpr_log(GPR_INFO, "server started, pid %d", svr.GetPID()); |
// Wait one second before starting client to give the server a chance
// to start up.
gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); |
// start the client
gpr_log(GPR_INFO, "starting client"); |
std::vector<std::string> client_flags = { |
absl::StrCat(root, "/memory_usage_client", |
gpr_subprocess_binary_extension()), |
"--target", |
grpc_core::JoinHostPort("localhost", port), |
absl::GetFlag(FLAGS_use_xds) |
? absl::StrCat("xds:", XdsResourceUtils::kServerName) |
: grpc_core::LocalIpAndPort(port), |
"--grpc_experiments", |
std::string(grpc_core::ConfigVars::Get().Experiments()), |
absl::StrCat("--warmup=", 10000), |
@ -103,6 +130,7 @@ int RunCallBenchmark(char* root, std::vector<std::string> server_scenario_flags, |
// Add scenario-specific client flags to the end of the client_flags
absl::c_move(client_scenario_flags, std::back_inserter(client_flags)); |
Subprocess cli(client_flags); |
gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID()); |
// wait for completion
if ((status = cli.Join()) != 0) { |
printf("client failed with: %d", status); |
@ -114,32 +142,38 @@ int RunCallBenchmark(char* root, std::vector<std::string> server_scenario_flags, |
} |
// Per-channel benchmark
int RunChannelBenchmark(char* root) { |
int RunChannelBenchmark(int port, char* root) { |
// TODO(chennancy) Add the scenario specific flags
int status; |
int port = grpc_pick_unused_port_or_die(); |
// start the server
gpr_log(GPR_INFO, "starting server"); |
std::vector<std::string> server_flags = { |
absl::StrCat(root, "/memory_usage_callback_server", |
gpr_subprocess_binary_extension()), |
"--bind", grpc_core::JoinHostPort("::", port)}; |
"--bind", grpc_core::LocalIpAndPort(port)}; |
if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds"); |
Subprocess svr(server_flags); |
gpr_log(GPR_INFO, "server started, pid %d", svr.GetPID()); |
// Wait one second before starting client to avoid possible race condition
// of client sending an RPC before the server is set up
gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); |
// start the client
gpr_log(GPR_INFO, "starting client"); |
std::vector<std::string> client_flags = { |
absl::StrCat(root, "/memory_usage_callback_client", |
gpr_subprocess_binary_extension()), |
"--target", |
grpc_core::JoinHostPort("localhost", port), |
absl::GetFlag(FLAGS_use_xds) |
? absl::StrCat("xds:", XdsResourceUtils::kServerName) |
: grpc_core::LocalIpAndPort(port), |
"--nosecure", |
absl::StrCat("--server_pid=", svr.GetPID()), |
absl::StrCat("--size=", absl::GetFlag(FLAGS_size))}; |
Subprocess cli(client_flags); |
gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID()); |
// wait for completion
if ((status = cli.Join()) != 0) { |
printf("client failed with: %d", status); |
@ -149,17 +183,78 @@ int RunChannelBenchmark(char* root) { |
return svr.Join() == 0 ? 0 : 2; |
} |
struct XdsServer { |
std::shared_ptr<grpc::testing::AdsServiceImpl> ads_service; |
std::unique_ptr<grpc::Server> server; |
}; |
XdsServer StartXdsServerAndConfigureBootstrap(int server_port) { |
XdsServer xds_server; |
int xds_server_port = grpc_pick_unused_port_or_die(); |
gpr_log(GPR_INFO, "xDS server port: %d", xds_server_port); |
// Generate xDS bootstrap and set the env var.
std::string bootstrap = |
grpc::testing::XdsBootstrapBuilder() |
.SetDefaultServer(absl::StrCat("localhost:", xds_server_port)) |
.SetXdsChannelCredentials("insecure") |
.Build(); |
grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap); |
gpr_log(GPR_INFO, "xDS bootstrap: %s", bootstrap.c_str()); |
// Create ADS service.
xds_server.ads_service = std::make_shared<grpc::testing::AdsServiceImpl>(); |
xds_server.ads_service->Start(); |
// Populate xDS resources.
XdsResourceUtils::SetListenerAndRouteConfiguration( |
xds_server.ads_service.get(), XdsResourceUtils::DefaultListener(), |
XdsResourceUtils::DefaultRouteConfig()); |
auto cluster = XdsResourceUtils::DefaultCluster(); |
cluster.mutable_circuit_breakers() |
->add_thresholds() |
->mutable_max_requests() |
->set_value(std::numeric_limits<uint32_t>::max()); |
xds_server.ads_service->SetCdsResource(cluster); |
xds_server.ads_service->SetEdsResource( |
XdsResourceUtils::BuildEdsResource(XdsResourceUtils::EdsResourceArgs( |
{XdsResourceUtils::EdsResourceArgs::Locality( |
"here", |
{XdsResourceUtils::EdsResourceArgs::Endpoint(server_port)})}))); |
XdsResourceUtils::SetServerListenerNameAndRouteConfiguration( |
xds_server.ads_service.get(), XdsResourceUtils::DefaultServerListener(), |
server_port, XdsResourceUtils::DefaultServerRouteConfig()); |
// Create and start server.
gpr_log(GPR_INFO, "starting xDS server..."); |
grpc::ServerBuilder builder; |
builder.RegisterService(xds_server.ads_service.get()); |
builder.AddListeningPort(absl::StrCat("localhost:", xds_server_port), |
grpc::InsecureServerCredentials()); |
xds_server.server = builder.BuildAndStart(); |
gpr_log(GPR_INFO, "xDS server started"); |
return xds_server; |
} |
int RunBenchmark(char* root, absl::string_view benchmark, |
std::vector<std::string> server_scenario_flags, |
std::vector<std::string> client_scenario_flags) { |
gpr_log(GPR_INFO, "running benchmark: %s", std::string(benchmark).c_str()); |
int server_port = grpc_pick_unused_port_or_die(); |
gpr_log(GPR_INFO, "server port: %d", server_port); |
XdsServer xds_server; |
if (absl::GetFlag(FLAGS_use_xds)) { |
xds_server = StartXdsServerAndConfigureBootstrap(server_port); |
} |
int retval; |
if (benchmark == "call") { |
return RunCallBenchmark(root, server_scenario_flags, client_scenario_flags); |
retval = RunCallBenchmark(server_port, root, server_scenario_flags, |
client_scenario_flags); |
} else if (benchmark == "channel") { |
return RunChannelBenchmark(root); |
retval = RunChannelBenchmark(server_port, root); |
} else { |
gpr_log(GPR_INFO, "Not a valid benchmark name"); |
return 4; |
retval = 4; |
} |
if (xds_server.server != nullptr) xds_server.server->Shutdown(); |
gpr_log(GPR_INFO, "done running benchmark"); |
return retval; |
} |
int main(int argc, char** argv) { |
@ -199,10 +294,12 @@ int main(int argc, char** argv) { |
// Run all benchmarks listed (Multiple benchmarks usually only for default
// scenario)
auto benchmarks = absl::StrSplit(absl::GetFlag(FLAGS_benchmark_names), ','); |
grpc_init(); |
for (const auto& benchmark : benchmarks) { |
int r = RunBenchmark(root, benchmark, it_scenario->second.server, |
it_scenario->second.client); |
if (r != 0) return r; |
} |
grpc_shutdown(); |
return 0; |
} |