diff --git a/test/core/memory_usage/BUILD b/test/core/memory_usage/BUILD index d16a4a5b153..afca9ca49a8 100644 --- a/test/core/memory_usage/BUILD +++ b/test/core/memory_usage/BUILD @@ -91,6 +91,7 @@ grpc_cc_binary( external_deps = [ "absl/flags:flag", "absl/flags:parse", + "absl/time", "gtest", ], tags = [ diff --git a/test/core/memory_usage/callback_client.cc b/test/core/memory_usage/callback_client.cc index cdc2d1305eb..60d93665eb0 100644 --- a/test/core/memory_usage/callback_client.cc +++ b/test/core/memory_usage/callback_client.cc @@ -16,16 +16,23 @@ * */ +#include + +#include #include #include +#include #include "absl/flags/flag.h" #include "absl/flags/parse.h" #include "absl/strings/string_view.h" +#include "absl/synchronization/notification.h" +#include "absl/utility/utility.h" #include #include #include +#include #include #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h" @@ -36,8 +43,9 @@ ABSL_FLAG(std::string, target, "", "Target host:port"); ABSL_FLAG(bool, secure, false, "Use SSL Credentials"); ABSL_FLAG(int, server_pid, 99999, "Server's pid"); +ABSL_FLAG(int, size, 50, "Number of channels"); -std::unique_ptr CreateStubForTest() { +std::shared_ptr CreateChannelForTest(int index) { // Set the authentication mechanism. std::shared_ptr creds = grpc::InsecureChannelCredentials(); @@ -46,58 +54,68 @@ std::unique_ptr CreateStubForTest() { gpr_log(GPR_INFO, "Supposed to be secure, is not yet"); } + // Channel args to prevent connection from closing after RPC is done + grpc::ChannelArguments channel_args; + channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_IDLE_MS, INT_MAX); + channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_AGE_MS, INT_MAX); + // Arg to bypass mechanism that combines channels on the serverside if they + // have the same channel args. Allows for one channel per connection + channel_args.SetInt("grpc.memory_usage_counter", index); + // Create a channel to the server and a stub std::shared_ptr channel = - CreateChannel(absl::GetFlag(FLAGS_target), creds); - std::unique_ptr stub = - grpc::testing::BenchmarkService::NewStub(channel); - return stub; + CreateCustomChannel(absl::GetFlag(FLAGS_target), creds, channel_args); + return channel; } -void UnaryCall() { +struct CallParams { + grpc::ClientContext context; + grpc::testing::SimpleRequest request; + grpc::testing::SimpleResponse response; + grpc::testing::MemorySize snapshot_response; + absl::Notification done; +}; + +// Simple Unary RPC to send to confirm connection is open +std::shared_ptr UnaryCall(std::shared_ptr channel) { std::unique_ptr stub = - CreateStubForTest(); + grpc::testing::BenchmarkService::NewStub(channel); // Start a call. - struct CallParams { - grpc::ClientContext context; - grpc::testing::SimpleRequest request; - grpc::testing::SimpleResponse response; - }; - CallParams* params = new CallParams(); + auto params = std::make_shared(); stub->async()->UnaryCall(¶ms->context, ¶ms->request, - ¶ms->response, [](const grpc::Status& status) { - if (status.ok()) { - gpr_log(GPR_INFO, "UnaryCall RPC succeeded."); - } else { + ¶ms->response, + [params](const grpc::Status& status) { + if (!status.ok()) { gpr_log(GPR_ERROR, "UnaryCall RPC failed."); } + params->done.Notify(); }); + return params; } // Get memory usage of server's process before the server is made -long GetBeforeSnapshot() { +std::shared_ptr GetBeforeSnapshot( + std::shared_ptr channel, long& before_server_memory) { std::unique_ptr stub = - CreateStubForTest(); + grpc::testing::BenchmarkService::NewStub(channel); // Start a call. - struct CallParams { - grpc::ClientContext context; - grpc::testing::SimpleRequest request; - grpc::testing::MemorySize response; - }; - CallParams* params = new CallParams(); + auto params = std::make_shared(); stub->async()->GetBeforeSnapshot( - ¶ms->context, ¶ms->request, ¶ms->response, - [params](const grpc::Status& status) { + ¶ms->context, ¶ms->request, ¶ms->snapshot_response, + [params, &before_server_memory](const grpc::Status& status) { if (status.ok()) { - gpr_log(GPR_INFO, "Before: %ld", params->response.rss()); + before_server_memory = params->snapshot_response.rss(); + gpr_log(GPR_INFO, "Server Before RPC: %ld", + params->snapshot_response.rss()); gpr_log(GPR_INFO, "GetBeforeSnapshot succeeded."); } else { gpr_log(GPR_ERROR, "GetBeforeSnapshot failed."); } + params->done.Notify(); }); - return params->response.rss(); + return params; } int main(int argc, char** argv) { @@ -111,20 +129,45 @@ int main(int argc, char** argv) { return 1; } gpr_log(GPR_INFO, "Client Target: %s", absl::GetFlag(FLAGS_target).c_str()); + gpr_log(GPR_INFO, "Client Size: %d", absl::GetFlag(FLAGS_size)); // Getting initial memory usage - long before_server_memory = GetBeforeSnapshot(); + std::shared_ptr get_memory_channel = CreateChannelForTest(0); + long before_server_memory; + GetBeforeSnapshot(get_memory_channel, before_server_memory) + ->done.WaitForNotification(); long before_client_memory = GetMemUsage(); - UnaryCall(); + // Create the channels and send an RPC to confirm they're open + int size = absl::GetFlag(FLAGS_size); + std::vector> channels_list(size); + for (int i = 0; i < size; ++i) { + std::shared_ptr channel = CreateChannelForTest(i); + channels_list[i] = channel; + UnaryCall(channel)->done.WaitForNotification(); + } // Getting peak memory usage long peak_server_memory = GetMemUsage(absl::GetFlag(FLAGS_server_pid)); long peak_client_memory = GetMemUsage(); gpr_log(GPR_INFO, "Before Server Mem: %ld", before_server_memory); gpr_log(GPR_INFO, "Before Client Mem: %ld", before_client_memory); - gpr_log(GPR_INFO, "Peak Client Mem: %ld", peak_client_memory); gpr_log(GPR_INFO, "Peak Server Mem: %ld", peak_server_memory); + gpr_log(GPR_INFO, "Peak Client Mem: %ld", peak_client_memory); + gpr_log(GPR_INFO, "Server Per Channel Memory: %f", + static_cast(peak_server_memory - before_server_memory) / + static_cast(size)); + gpr_log(GPR_INFO, "Client Per Channel Memory: %f", + static_cast(peak_client_memory - before_client_memory) / + static_cast(size)); + + // Checking that all channels are still open + for (int i = 0; i < size; ++i) { + GPR_ASSERT(!absl::exchange(channels_list[i], nullptr) + ->WaitForStateChange(GRPC_CHANNEL_READY, + std::chrono::system_clock::now() + + std::chrono::milliseconds(1))); + } gpr_log(GPR_INFO, "Client Done"); return 0; } diff --git a/test/core/memory_usage/callback_server.cc b/test/core/memory_usage/callback_server.cc index 4cdcee17cbb..6d69cb63243 100644 --- a/test/core/memory_usage/callback_server.cc +++ b/test/core/memory_usage/callback_server.cc @@ -47,7 +47,6 @@ class ServerCallbackImpl final grpc::CallbackServerContext* context, const grpc::testing::SimpleRequest* request, grpc::testing::SimpleResponse* response) override { - gpr_log(GPR_INFO, "UnaryCall RPC CALL RECEIVED"); auto* reactor = context->DefaultReactor(); reactor->Finish(grpc::Status::OK); return reactor; diff --git a/test/core/memory_usage/memory_usage_test.cc b/test/core/memory_usage/memory_usage_test.cc index 299f05e6b24..838f9063e8c 100644 --- a/test/core/memory_usage/memory_usage_test.cc +++ b/test/core/memory_usage/memory_usage_test.cc @@ -128,8 +128,11 @@ int RunChannelBenchmark(char* root) { std::vector client_flags = { absl::StrCat(root, "/memory_usage_callback_client", gpr_subprocess_binary_extension()), - "--target", grpc_core::JoinHostPort("127.0.0.1", port), "--nosecure", - absl::StrCat("--server_pid=", svr.GetPID())}; + "--target", + grpc_core::JoinHostPort("127.0.0.1", port), + "--nosecure", + absl::StrCat("--server_pid=", svr.GetPID()), + absl::StrCat("--size=", absl::GetFlag(FLAGS_size))}; Subprocess cli(client_flags); /* wait for completion */ if ((status = cli.Join()) != 0) {