diff --git a/test/core/memory_usage/BUILD b/test/core/memory_usage/BUILD index 946cbc65d23..fcb08e3ac81 100644 --- a/test/core/memory_usage/BUILD +++ b/test/core/memory_usage/BUILD @@ -67,10 +67,62 @@ grpc_cc_binary( ], ) +grpc_cc_binary( + name = "memory_usage_callback_client", + testonly = 1, + srcs = ["callback_client.cc"], + external_deps = [ + "absl/flags:flag", + "absl/flags:parse", + "gtest", + ], + tags = [ + "bazel_only", + "no_mac", + "no_windows", + ], + deps = [ + ":memstats", + "//:gpr", + "//:grpc", + "//:grpc++", + "//src/proto/grpc/testing:benchmark_service_proto", + "//test/core/end2end:ssl_test_data", + "//test/core/util:grpc_test_util", + ], +) + +grpc_cc_binary( + name = "memory_usage_callback_server", + testonly = 1, + srcs = ["callback_server.cc"], + external_deps = [ + "absl/flags:flag", + "absl/flags:parse", + "gtest", + ], + tags = [ + "bazel_only", + "no_mac", + "no_windows", + ], + deps = [ + ":memstats", + "//:gpr", + "//:grpc", + "//:grpc++", + "//src/proto/grpc/testing:benchmark_service_proto", + "//test/core/end2end:ssl_test_data", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_test( name = "memory_usage_test", srcs = ["memory_usage_test.cc"], data = [ + ":memory_usage_callback_client", + ":memory_usage_callback_server", ":memory_usage_client", ":memory_usage_server", ], diff --git a/test/core/memory_usage/callback_client.cc b/test/core/memory_usage/callback_client.cc new file mode 100644 index 00000000000..e16b10932a8 --- /dev/null +++ b/test/core/memory_usage/callback_client.cc @@ -0,0 +1,106 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include + +#include + +#include "absl/algorithm/container.h" +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "util/logging.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/socket_utils.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/cpp/client/secure_credentials.h" +#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h" +#include "test/core/memory_usage/memstats.h" +#include "test/core/util/port.h" +#include "test/core/util/subprocess.h" +#include "test/core/util/test_config.h" + +ABSL_FLAG(std::string, target, "", "Target host:port"); +ABSL_FLAG(bool, secure, false, "Use SSL Credentials"); + +void UnaryCall() { + // Set the authentication mechanism. + std::shared_ptr creds = + grpc::InsecureChannelCredentials(); + if (absl::GetFlag(FLAGS_secure)) { + // TODO (chennancy) Add in secure credentials + gpr_log(GPR_INFO, "Supposed to be secure, is not yet"); + } + + // Create a channel to the server and a stub + std::shared_ptr channel = + CreateChannel(absl::GetFlag(FLAGS_target), creds); + std::unique_ptr stub = + grpc::testing::BenchmarkService::NewStub(channel); + + // Start a call. + struct CallParams { + grpc::ClientContext context; + grpc::testing::SimpleRequest request; + grpc::testing::SimpleResponse response; + }; + CallParams* params = new CallParams(); + stub->async()->UnaryCall(¶ms->context, ¶ms->request, + ¶ms->response, [](const grpc::Status& status) { + if (status.ok()) { + gpr_log(GPR_INFO, "UnaryCall RPC succeeded."); + } else { + gpr_log(GPR_ERROR, "UnaryCall RPC failed."); + } + }); +} + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + char* fake_argv[1]; + GPR_ASSERT(argc >= 1); + fake_argv[0] = argv[0]; + grpc::testing::TestEnvironment env(&argc, argv); + if (absl::GetFlag(FLAGS_target).empty()) { + gpr_log(GPR_ERROR, "Client: No target port entered"); + return 1; + } + gpr_log(GPR_INFO, "Client Target: %s", absl::GetFlag(FLAGS_target).c_str()); + + UnaryCall(); + gpr_log(GPR_INFO, "Client Done"); + return 0; +} diff --git a/test/core/memory_usage/callback_server.cc b/test/core/memory_usage/callback_server.cc new file mode 100644 index 00000000000..a8b63bdcbf2 --- /dev/null +++ b/test/core/memory_usage/callback_server.cc @@ -0,0 +1,107 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include + +#include + +#include + +#include "absl/algorithm/container.h" +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "util/logging.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/cpp/client/secure_credentials.h" +#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h" +#include "test/core/memory_usage/memstats.h" +#include "test/core/util/port.h" +#include "test/core/util/subprocess.h" +#include "test/core/util/test_config.h" + +class ServerCallbackImpl final + : public grpc::testing::BenchmarkService::CallbackService { + grpc::ServerUnaryReactor* UnaryCall( + grpc::CallbackServerContext* context, + const grpc::testing::SimpleRequest* request, + grpc::testing::SimpleResponse* response) override { + gpr_log(GPR_INFO, "RPC CALL RECEIVED"); + auto* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; + } +}; + +/* We have some sort of deadlock, so let's not exit gracefully for now. + TODO(chennancy) Add graceful shutdown */ +static void sigint_handler(int /*x*/) { _exit(0); } + +ABSL_FLAG(std::string, bind, "", "Bind host:port"); +ABSL_FLAG(bool, secure, false, "Use SSL Credentials"); + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + char* fake_argv[1]; + GPR_ASSERT(argc >= 1); + fake_argv[0] = argv[0]; + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + signal(SIGINT, sigint_handler); + std::string server_address = absl::GetFlag(FLAGS_bind); + if (server_address.empty()) { + gpr_log(GPR_ERROR, "Server: No port entered"); + return 1; + } + gpr_log(GPR_INFO, "Server port: %s", server_address.c_str()); + + ServerCallbackImpl callback_server; + grpc::ServerBuilder builder; + // Set the authentication mechanism. + std::shared_ptr creds = + grpc::InsecureServerCredentials(); + if (absl::GetFlag(FLAGS_secure)) { + gpr_log(GPR_INFO, "Supposed to be secure, is not yet"); + // TODO (chennancy) Add in secure credentials + } + builder.AddListeningPort(server_address, creds); + builder.RegisterService(&callback_server); + + // Set up the server to start accepting requests. + std::shared_ptr server(builder.BuildAndStart()); + gpr_log(GPR_INFO, "Server listening on %s", server_address.c_str()); + + // Keep the program running until the server shuts down. + server->Wait(); + return 0; +} diff --git a/test/core/memory_usage/memory_usage_test.cc b/test/core/memory_usage/memory_usage_test.cc index 87e99f19c95..4c66f34754f 100644 --- a/test/core/memory_usage/memory_usage_test.cc +++ b/test/core/memory_usage/memory_usage_test.cc @@ -25,18 +25,24 @@ #include "absl/flags/flag.h" #include "absl/flags/parse.h" #include "absl/strings/str_cat.h" +#include "absl/strings/str_split.h" +#include "util/logging.h" #include #include #include +#include #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/host_port.h" #include "test/core/util/port.h" #include "test/core/util/subprocess.h" +#include "test/core/util/test_config.h" -ABSL_FLAG(std::string, benchmark_name, "call", "Which benchmark to run"); -ABSL_FLAG(int, size, 50000, "Number of channels/calls"); +ABSL_FLAG(std::string, benchmark_names, "call,channel", + "Which benchmark to run"); // Default all benchmarks in order to + // trigger CI testing for each one +ABSL_FLAG(int, size, 1000, "Number of channels/calls"); ABSL_FLAG(std::string, scenario_config, "insecure", "Possible Values: minstack (Use minimal stack), resource_quota, " "secure (Use SSL credentials on server)"); @@ -63,15 +69,94 @@ class Subprocess { gpr_subprocess* process_; }; +/* per-call memory usage benchmark */ +int RunCallBenchmark(char* root, std::vector server_scenario_flags, + std::vector client_scenario_flags) { + int status; + int port = grpc_pick_unused_port_or_die(); + + /* start the server */ + std::vector server_flags = { + absl::StrCat(root, "/memory_usage_server", + gpr_subprocess_binary_extension()), + "--bind", grpc_core::JoinHostPort("::", port)}; + // Add scenario-specific server flags to the end of the server_flags + absl::c_move(server_scenario_flags, std::back_inserter(server_flags)); + Subprocess svr(server_flags); + + /* start the client */ + std::vector client_flags = { + absl::StrCat(root, "/memory_usage_client", + gpr_subprocess_binary_extension()), + "--target", grpc_core::JoinHostPort("127.0.0.1", port), + absl::StrCat("--warmup=", 10000), + absl::StrCat("--benchmark=", absl::GetFlag(FLAGS_size))}; + // Add scenario-specific client flags to the end of the client_flags + absl::c_move(client_scenario_flags, std::back_inserter(client_flags)); + Subprocess cli(client_flags); + /* wait for completion */ + if ((status = cli.Join()) != 0) { + printf("client failed with: %d", status); + return 1; + } + + svr.Interrupt(); + return svr.Join() == 0 ? 0 : 2; +} + +/* Per-channel benchmark*/ +int RunChannelBenchmark(char* root) { + // TODO(chennancy) Add the scenario specific flags + int status; + int port = grpc_pick_unused_port_or_die(); + + /* start the server */ + std::vector server_flags = { + absl::StrCat(root, "/memory_usage_callback_server", + gpr_subprocess_binary_extension()), + "--bind", grpc_core::JoinHostPort("::", port)}; + Subprocess svr(server_flags); + + // Wait one second before starting client to avoid possible race condition + // of client sending an RPC before the server is set up + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); + + /* start the client */ + std::vector client_flags = { + absl::StrCat(root, "/memory_usage_callback_client", + gpr_subprocess_binary_extension()), + "--target", grpc_core::JoinHostPort("127.0.0.1", port), "--nosecure"}; + Subprocess cli(client_flags); + /* wait for completion */ + if ((status = cli.Join()) != 0) { + printf("client failed with: %d", status); + return 1; + } + svr.Interrupt(); + return svr.Join() == 0 ? 0 : 2; +} + +int RunBenchmark(char* root, absl::string_view benchmark, + std::vector server_scenario_flags, + std::vector client_scenario_flags) { + if (benchmark == "call") { + return RunCallBenchmark(root, server_scenario_flags, client_scenario_flags); + } else if (benchmark == "channel") { + return RunChannelBenchmark(root); + } else { + gpr_log(GPR_INFO, "Not a valid benchmark name"); + return 4; + } +} + int main(int argc, char** argv) { absl::ParseCommandLine(argc, argv); char* me = argv[0]; char* lslash = strrchr(me, '/'); char root[1024]; - int port = grpc_pick_unused_port_or_die(); + std::vector args; - int status; /* figure out where we are */ if (lslash) { memcpy(root, me, static_cast(lslash - me)); @@ -98,37 +183,13 @@ int main(int argc, char** argv) { return 3; } - /* per-call memory usage benchmark */ - if (absl::GetFlag(FLAGS_benchmark_name) == "call") { - /* start the server */ - std::vector server_flags = { - absl::StrCat(root, "/memory_usage_server", - gpr_subprocess_binary_extension()), - "--bind", grpc_core::JoinHostPort("::", port)}; - // Add scenario-specific server flags to the end of the server_flags - absl::c_move(it_scenario->second.server, std::back_inserter(server_flags)); - Subprocess svr(server_flags); - - /* start the client */ - std::vector client_flags = { - absl::StrCat(root, "/memory_usage_client", - gpr_subprocess_binary_extension()), - "--target", grpc_core::JoinHostPort("127.0.0.1", port), - absl::StrCat("--warmup=", 10000), - absl::StrCat("--benchmark=", absl::GetFlag(FLAGS_size))}; - // Add scenario-specific client flags to the end of the client_flags - absl::c_move(it_scenario->second.client, std::back_inserter(client_flags)); - Subprocess cli(client_flags); - /* wait for completion */ - if ((status = cli.Join()) != 0) { - printf("client failed with: %d", status); - return 1; - } - - svr.Interrupt(); - return svr.Join() == 0 ? 0 : 2; + // Run all benchmarks listed (Multiple benchmarks usually only for default + // scenario) + auto benchmarks = absl::StrSplit(absl::GetFlag(FLAGS_benchmark_names), ','); + for (const auto& benchmark : benchmarks) { + int r = RunBenchmark(root, benchmark, it_scenario->second.server, + it_scenario->second.client); + if (r != 0) return r; } - - printf("Command line args couldn't be parsed\n"); - return 4; + return 0; } diff --git a/tools/profiling/memory/memory_diff.py b/tools/profiling/memory/memory_diff.py index adf7d78323b..981223f3403 100755 --- a/tools/profiling/memory/memory_diff.py +++ b/tools/profiling/memory/memory_diff.py @@ -66,6 +66,8 @@ def _run(): try: output = subprocess.check_output([ 'bazel-bin/test/core/memory_usage/memory_usage_test', + '--benchmark_names=call', + '--size=50000', ] + extra_args) except subprocess.CalledProcessError as e: print('Error running benchmark:', e)