[memory test] add benchmark for xDS channel with multiple addresses (#34902)

The plumbing is a little messy here, but I didn't see an obvious way to
improve it without a lot of work.
pull/34908/head
Mark D. Roth 1 year ago committed by GitHub
parent 920882fbd9
commit c5c46a1640
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      test/core/memory_usage/callback_client.cc
  2. 101
      test/core/memory_usage/memory_usage_test.cc
  3. 10
      tools/profiling/memory/memory_diff.py

@ -28,6 +28,7 @@
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/log.h>
@ -44,7 +45,7 @@
ABSL_FLAG(std::string, target, "", "Target host:port");
ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
ABSL_FLAG(int, server_pid, 99999, "Server's pid");
ABSL_FLAG(int, server_pid, 0, "Server's pid");
ABSL_FLAG(int, size, 50, "Number of channels");
std::shared_ptr<grpc::Channel> CreateChannelForTest(int index) {
@ -150,7 +151,9 @@ int main(int argc, char** argv) {
}
// Getting peak memory usage
long peak_server_memory = GetMemUsage(absl::GetFlag(FLAGS_server_pid));
long peak_server_memory = absl::GetFlag(FLAGS_server_pid) > 0
? GetMemUsage(absl::GetFlag(FLAGS_server_pid))
: 0;
long peak_client_memory = GetMemUsage();
// Checking that all channels are still open
@ -161,16 +164,23 @@ int main(int argc, char** argv) {
std::chrono::milliseconds(1)));
}
const char* prefix = "";
std::string prefix;
if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
if (absl::GetFlag(FLAGS_server_pid) == 0) {
absl::StrAppend(&prefix, "multi_address ");
}
printf("---------Client channel stats--------\n");
printf("%sclient channel memory usage: %f bytes per channel\n", prefix,
printf("%sclient channel memory usage: %f bytes per channel\n",
prefix.c_str(),
static_cast<double>(peak_client_memory - before_client_memory) / size *
1024);
printf("---------Server channel stats--------\n");
printf("%sserver channel memory usage: %f bytes per channel\n", prefix,
static_cast<double>(peak_server_memory - before_server_memory) / size *
1024);
if (absl::GetFlag(FLAGS_server_pid) > 0) {
printf("---------Server channel stats--------\n");
printf("%sserver channel memory usage: %f bytes per channel\n",
prefix.c_str(),
static_cast<double>(peak_server_memory - before_server_memory) /
size * 1024);
}
gpr_log(GPR_INFO, "Client Done");
return 0;
}

@ -33,6 +33,7 @@
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "google/protobuf/wrappers.pb.h"
@ -48,6 +49,7 @@
#include "src/core/lib/gpr/subprocess.h"
#include "src/core/lib/gprpp/env.h"
#include "src/proto/grpc/testing/xds/v3/cluster.pb.h"
#include "src/proto/grpc/testing/xds/v3/health_check.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/resolve_localhost_ip46.h"
#include "test/core/util/test_config.h"
@ -56,9 +58,12 @@
using grpc::testing::XdsResourceUtils;
ABSL_FLAG(std::string, benchmark_names, "call,channel",
"Which benchmark to run"); // Default all benchmarks in order to
// trigger CI testing for each one
// Default all benchmarks in order to trigger CI testing for each one
ABSL_FLAG(std::string, benchmark_names, "",
"Which benchmark to run. If empty, defaults to 'call,channel' "
"if --use_xds is false, or 'call,channel,channel_multi_address' "
"if --use_xds is true.");
ABSL_FLAG(int, size, 1000, "Number of channels/calls");
ABSL_FLAG(std::string, scenario_config, "insecure",
"Possible Values: minstack (Use minimal stack), resource_quota, "
@ -142,19 +147,22 @@ int RunCallBenchmark(int port, char* root,
}
// Per-channel benchmark
int RunChannelBenchmark(int port, char* root) {
int RunChannelBenchmark(const std::vector<int>& server_ports, char* root) {
// TODO(chennancy) Add the scenario specific flags
int status;
// start the server
gpr_log(GPR_INFO, "starting server");
std::vector<std::string> server_flags = {
absl::StrCat(root, "/memory_usage_callback_server",
gpr_subprocess_binary_extension()),
"--bind", grpc_core::LocalIpAndPort(port)};
if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
Subprocess svr(server_flags);
gpr_log(GPR_INFO, "server started, pid %d", svr.GetPID());
// start the servers
std::vector<Subprocess> servers;
servers.reserve(server_ports.size());
for (int port : server_ports) {
gpr_log(GPR_INFO, "starting server on port %d", port);
std::vector<std::string> server_flags = {
absl::StrCat(root, "/memory_usage_callback_server",
gpr_subprocess_binary_extension()),
"--bind", grpc_core::LocalIpAndPort(port)};
if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
servers.emplace_back(server_flags);
gpr_log(GPR_INFO, "server started, pid %d", servers.back().GetPID());
}
// Wait one second before starting client to avoid possible race condition
// of client sending an RPC before the server is set up
@ -168,19 +176,25 @@ int RunChannelBenchmark(int port, char* root) {
"--target",
absl::GetFlag(FLAGS_use_xds)
? absl::StrCat("xds:", XdsResourceUtils::kServerName)
: grpc_core::LocalIpAndPort(port),
"--nosecure",
absl::StrCat("--server_pid=", svr.GetPID()),
absl::StrCat("--size=", absl::GetFlag(FLAGS_size))};
: grpc_core::LocalIpAndPort(server_ports[0]),
"--nosecure", absl::StrCat("--size=", absl::GetFlag(FLAGS_size))};
if (server_ports.size() == 1) {
client_flags.emplace_back(
absl::StrCat("--server_pid=", servers[0].GetPID()));
}
Subprocess cli(client_flags);
gpr_log(GPR_INFO, "client started, pid %d", cli.GetPID());
// wait for completion
if ((status = cli.Join()) != 0) {
printf("client failed with: %d", status);
int retval = cli.Join();
if (retval != 0) {
printf("client failed with: %d", retval);
return 1;
}
svr.Interrupt();
return svr.Join() == 0 ? 0 : 2;
for (auto& server : servers) {
server.Interrupt();
if (server.Join() != 0) retval = 2;
}
return retval;
}
struct XdsServer {
@ -188,7 +202,8 @@ struct XdsServer {
std::unique_ptr<grpc::Server> server;
};
XdsServer StartXdsServerAndConfigureBootstrap(int server_port) {
XdsServer StartXdsServerAndConfigureBootstrap(
const std::vector<int>& server_ports) {
XdsServer xds_server;
int xds_server_port = grpc_pick_unused_port_or_die();
gpr_log(GPR_INFO, "xDS server port: %d", xds_server_port);
@ -204,6 +219,14 @@ XdsServer StartXdsServerAndConfigureBootstrap(int server_port) {
xds_server.ads_service = std::make_shared<grpc::testing::AdsServiceImpl>();
xds_server.ads_service->Start();
// Populate xDS resources.
std::vector<XdsResourceUtils::EdsResourceArgs::Endpoint> endpoints;
endpoints.reserve(server_ports.size());
for (int port : server_ports) {
endpoints.emplace_back(port);
XdsResourceUtils::SetServerListenerNameAndRouteConfiguration(
xds_server.ads_service.get(), XdsResourceUtils::DefaultServerListener(),
port, XdsResourceUtils::DefaultServerRouteConfig());
}
XdsResourceUtils::SetListenerAndRouteConfiguration(
xds_server.ads_service.get(), XdsResourceUtils::DefaultListener(),
XdsResourceUtils::DefaultRouteConfig());
@ -216,11 +239,7 @@ XdsServer StartXdsServerAndConfigureBootstrap(int server_port) {
xds_server.ads_service->SetEdsResource(
XdsResourceUtils::BuildEdsResource(XdsResourceUtils::EdsResourceArgs(
{XdsResourceUtils::EdsResourceArgs::Locality(
"here",
{XdsResourceUtils::EdsResourceArgs::Endpoint(server_port)})})));
XdsResourceUtils::SetServerListenerNameAndRouteConfiguration(
xds_server.ads_service.get(), XdsResourceUtils::DefaultServerListener(),
server_port, XdsResourceUtils::DefaultServerRouteConfig());
"here", std::move(endpoints))})));
// Create and start server.
gpr_log(GPR_INFO, "starting xDS server...");
grpc::ServerBuilder builder;
@ -236,18 +255,24 @@ int RunBenchmark(char* root, absl::string_view benchmark,
std::vector<std::string> server_scenario_flags,
std::vector<std::string> client_scenario_flags) {
gpr_log(GPR_INFO, "running benchmark: %s", std::string(benchmark).c_str());
int server_port = grpc_pick_unused_port_or_die();
gpr_log(GPR_INFO, "server port: %d", server_port);
const size_t num_ports = benchmark == "channel_multi_address" ? 10 : 1;
std::vector<int> server_ports;
server_ports.reserve(num_ports);
for (size_t i = 0; i < num_ports; ++i) {
server_ports.push_back(grpc_pick_unused_port_or_die());
}
gpr_log(GPR_INFO, "server ports: %s",
absl::StrJoin(server_ports, ",").c_str());
XdsServer xds_server;
if (absl::GetFlag(FLAGS_use_xds)) {
xds_server = StartXdsServerAndConfigureBootstrap(server_port);
xds_server = StartXdsServerAndConfigureBootstrap(server_ports);
}
int retval;
if (benchmark == "call") {
retval = RunCallBenchmark(server_port, root, server_scenario_flags,
retval = RunCallBenchmark(server_ports[0], root, server_scenario_flags,
client_scenario_flags);
} else if (benchmark == "channel") {
retval = RunChannelBenchmark(server_port, root);
} else if (benchmark == "channel" || benchmark == "channel_multi_address") {
retval = RunChannelBenchmark(server_ports, root);
} else {
gpr_log(GPR_INFO, "Not a valid benchmark name");
retval = 4;
@ -293,7 +318,13 @@ int main(int argc, char** argv) {
// Run all benchmarks listed (Multiple benchmarks usually only for default
// scenario)
auto benchmarks = absl::StrSplit(absl::GetFlag(FLAGS_benchmark_names), ',');
std::string benchmark_names = absl::GetFlag(FLAGS_benchmark_names);
if (benchmark_names.empty()) {
benchmark_names = absl::GetFlag(FLAGS_use_xds)
? "call,channel,channel_multi_address"
: "call,channel";
}
auto benchmarks = absl::StrSplit(benchmark_names, ',');
grpc_init();
for (const auto& benchmark : benchmarks) {
int r = RunBenchmark(root, benchmark, it_scenario->second.server,

@ -79,6 +79,10 @@ _INTERESTING = {
rb"xds server channel memory usage: ([0-9\.]+) bytes per channel",
float,
),
"channel_multi_address/xds_client": (
rb"xds multi_address client channel memory usage: ([0-9\.]+) bytes per channel",
float,
),
}
_SCENARIOS = {
@ -89,6 +93,10 @@ _SCENARIOS = {
_BENCHMARKS = {
"call": ["--benchmark_names=call", "--size=50000"],
"channel": ["--benchmark_names=channel", "--size=10000"],
"channel_multi_address": [
"--benchmark_names=channel_multi_address",
"--size=10000",
],
}
@ -110,6 +118,8 @@ def _run():
# TODO(chenancy) Remove when minstack is implemented for channel
if name == "channel" and scenario == "minstack":
continue
if name == "channel_multi_address" and not use_xds:
continue
argv = (
["bazel-bin/test/core/memory_usage/memory_usage_test"]
+ benchmark_args

Loading…
Cancel
Save