diff --git a/src/proto/grpc/testing/benchmark_service.proto b/src/proto/grpc/testing/benchmark_service.proto index 63167a8cee6..269d8a356aa 100644 --- a/src/proto/grpc/testing/benchmark_service.proto +++ b/src/proto/grpc/testing/benchmark_service.proto @@ -41,4 +41,7 @@ service BenchmarkService { // Two-sided unbounded streaming between server to client // Both sides send the content of their own choice to the other rpc StreamingBothWays(stream SimpleRequest) returns (stream SimpleResponse); + + //Get the memory usage of process before server is made + rpc GetBeforeSnapshot(SimpleRequest) returns (MemorySize); } diff --git a/src/proto/grpc/testing/messages.proto b/src/proto/grpc/testing/messages.proto index 559876ed7c0..1a03a6fd6ee 100644 --- a/src/proto/grpc/testing/messages.proto +++ b/src/proto/grpc/testing/messages.proto @@ -268,3 +268,7 @@ message ClientConfigureRequest { // Response for updating a test client's configuration. message ClientConfigureResponse {} + +message MemorySize { + int64 rss = 1; +} diff --git a/test/core/memory_usage/BUILD b/test/core/memory_usage/BUILD index 0a889b4d8de..d16a4a5b153 100644 --- a/test/core/memory_usage/BUILD +++ b/test/core/memory_usage/BUILD @@ -21,7 +21,22 @@ licenses(["notice"]) grpc_cc_library( name = "memstats", testonly = 1, + srcs = [ + "memstats.cc", + "memstats.h", + ], hdrs = ["memstats.h"], + external_deps = [ + "absl/types:optional", + ], + tags = [ + "bazel_only", + "no_mac", + "no_windows", + ], + deps = [ + "//:gpr", + ], ) grpc_cc_binary( diff --git a/test/core/memory_usage/callback_client.cc b/test/core/memory_usage/callback_client.cc index fa2dd1e1f4b..4138d6e3552 100644 --- a/test/core/memory_usage/callback_client.cc +++ b/test/core/memory_usage/callback_client.cc @@ -30,12 +30,14 @@ #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h" #include "src/proto/grpc/testing/messages.pb.h" +#include "test/core/memory_usage/memstats.h" #include "test/core/util/test_config.h" ABSL_FLAG(std::string, target, "", "Target host:port"); ABSL_FLAG(bool, secure, false, "Use SSL Credentials"); +ABSL_FLAG(int, server_pid, 99999, "Server's pid"); -void UnaryCall() { +std::unique_ptr CreateStubForTest() { // Set the authentication mechanism. std::shared_ptr creds = grpc::InsecureChannelCredentials(); @@ -49,6 +51,12 @@ void UnaryCall() { CreateChannel(absl::GetFlag(FLAGS_target), creds); std::unique_ptr stub = grpc::testing::BenchmarkService::NewStub(channel); + return stub; +} + +void UnaryCall() { + std::unique_ptr stub = + CreateStubForTest(); // Start a call. struct CallParams { @@ -67,6 +75,31 @@ void UnaryCall() { }); } +// Get memory usage of server's process before the server is made +long GetBeforeSnapshot() { + std::unique_ptr stub = + CreateStubForTest(); + + // Start a call. + struct CallParams { + grpc::ClientContext context; + grpc::testing::SimpleRequest request; + grpc::testing::MemorySize response; + }; + CallParams* params = new CallParams(); + stub->async()->GetBeforeSnapshot( + ¶ms->context, ¶ms->request, ¶ms->response, + [params](const grpc::Status& status) { + if (status.ok()) { + gpr_log(GPR_INFO, "Before: %ld", params->response.rss()); + gpr_log(GPR_INFO, "GetBeforeSnapshot succeeded."); + } else { + gpr_log(GPR_ERROR, "GetBeforeSnapshot failed."); + } + }); + return params->response.rss(); +} + int main(int argc, char** argv) { absl::ParseCommandLine(argc, argv); char* fake_argv[1]; @@ -79,7 +112,18 @@ int main(int argc, char** argv) { } gpr_log(GPR_INFO, "Client Target: %s", absl::GetFlag(FLAGS_target).c_str()); + // Getting initial memory usage + long before_server_memory = GetBeforeSnapshot(); + long before_client_memory = GetMemUsage(); + UnaryCall(); + + // Getting peak memory usage + long peak_server_memory = GetMemUsage(absl::GetFlag(FLAGS_server_pid)); + long peak_client_memory = GetMemUsage(); + gpr_log(GPR_INFO, "Before Client Mem: %ld", before_client_memory); + gpr_log(GPR_INFO, "Peak Client Mem: %ld", peak_client_memory); + gpr_log(GPR_INFO, "Peak Server Mem: %ld", peak_server_memory); gpr_log(GPR_INFO, "Client Done"); return 0; } diff --git a/test/core/memory_usage/callback_server.cc b/test/core/memory_usage/callback_server.cc index 98934f96ea8..4cdcee17cbb 100644 --- a/test/core/memory_usage/callback_server.cc +++ b/test/core/memory_usage/callback_server.cc @@ -34,19 +34,37 @@ #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h" #include "src/proto/grpc/testing/messages.pb.h" +#include "test/core/memory_usage/memstats.h" #include "test/core/util/test_config.h" class ServerCallbackImpl final : public grpc::testing::BenchmarkService::CallbackService { + public: + explicit ServerCallbackImpl(long before_server_memory) + : before_server_create(before_server_memory) {} + grpc::ServerUnaryReactor* UnaryCall( grpc::CallbackServerContext* context, const grpc::testing::SimpleRequest* request, grpc::testing::SimpleResponse* response) override { - gpr_log(GPR_INFO, "RPC CALL RECEIVED"); + gpr_log(GPR_INFO, "UnaryCall RPC CALL RECEIVED"); auto* reactor = context->DefaultReactor(); reactor->Finish(grpc::Status::OK); return reactor; } + grpc::ServerUnaryReactor* GetBeforeSnapshot( + grpc::CallbackServerContext* context, + const grpc::testing::SimpleRequest* request, + grpc::testing::MemorySize* response) override { + gpr_log(GPR_INFO, "BeforeSnapshot RPC CALL RECEIVED"); + response->set_rss(before_server_create); + auto* reactor = context->DefaultReactor(); + reactor->Finish(grpc::Status::OK); + return reactor; + } + + private: + long before_server_create; }; /* We have some sort of deadlock, so let's not exit gracefully for now. @@ -71,8 +89,12 @@ int main(int argc, char** argv) { } gpr_log(GPR_INFO, "Server port: %s", server_address.c_str()); - ServerCallbackImpl callback_server; + // Get initial process memory usage before creating server + long before_server_create = GetMemUsage(); + gpr_log(GPR_INFO, "Server Before Mem: %ld", before_server_create); + ServerCallbackImpl callback_server(before_server_create); grpc::ServerBuilder builder; + // Set the authentication mechanism. std::shared_ptr creds = grpc::InsecureServerCredentials(); diff --git a/test/core/memory_usage/memory_usage_test.cc b/test/core/memory_usage/memory_usage_test.cc index d266a6ac973..299f05e6b24 100644 --- a/test/core/memory_usage/memory_usage_test.cc +++ b/test/core/memory_usage/memory_usage_test.cc @@ -62,6 +62,7 @@ class Subprocess { process_ = gpr_subprocess_create(args_c.size(), args_c.data()); } + int GetPID() { return gpr_subprocess_get_process_id(process_); } int Join() { return gpr_subprocess_join(process_); } void Interrupt() { gpr_subprocess_interrupt(process_); } @@ -127,7 +128,8 @@ int RunChannelBenchmark(char* root) { std::vector client_flags = { absl::StrCat(root, "/memory_usage_callback_client", gpr_subprocess_binary_extension()), - "--target", grpc_core::JoinHostPort("127.0.0.1", port), "--nosecure"}; + "--target", grpc_core::JoinHostPort("127.0.0.1", port), "--nosecure", + absl::StrCat("--server_pid=", svr.GetPID())}; Subprocess cli(client_flags); /* wait for completion */ if ((status = cli.Join()) != 0) { diff --git a/test/core/memory_usage/memstats.cc b/test/core/memory_usage/memstats.cc new file mode 100644 index 00000000000..f14cafc78d4 --- /dev/null +++ b/test/core/memory_usage/memstats.cc @@ -0,0 +1,56 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/memory_usage/memstats.h" + +#include + +#include +#include + +#include "absl/strings/str_cat.h" + +#include + +long GetMemUsage(absl::optional pid) { + // Default is getting memory usage for self (calling process) + std::string path = "/proc/self/stat"; + if (pid != absl::nullopt) { + path = absl::StrCat("/proc/", pid.value(), "/stat"); + } + std::ifstream stat_stream(path, std::ios_base::in); + + double resident_set = 0.0; + // Temporary variables for irrelevant leading entries in stats + std::string temp_pid, comm, state, ppid, pgrp, session, tty_nr; + std::string tpgid, flags, minflt, cminflt, majflt, cmajflt; + std::string utime, stime, cutime, cstime, priority, nice; + std::string O, itrealvalue, starttime, vsize; + + // Get rss to find memory usage + long rss; + stat_stream >> temp_pid >> comm >> state >> ppid >> pgrp >> session >> + tty_nr >> tpgid >> flags >> minflt >> cminflt >> majflt >> cmajflt >> + utime >> stime >> cutime >> cstime >> priority >> nice >> O >> + itrealvalue >> starttime >> vsize >> rss; + stat_stream.close(); + + // pid does not connect to an existing process + GPR_ASSERT(!state.empty()); + + // Calculations in case x86-64 is configured to use 2MB pages + long page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; + resident_set = rss * page_size_kb; + return resident_set; +} diff --git a/test/core/memory_usage/memstats.h b/test/core/memory_usage/memstats.h index 47da40946b1..65841755c6a 100644 --- a/test/core/memory_usage/memstats.h +++ b/test/core/memory_usage/memstats.h @@ -15,18 +15,17 @@ #ifndef TEST_H #define TEST_H -#include -#include +#include "absl/types/optional.h" // IWYU pragma: no_include +// Get the memory usage of either the calling process or another process using +// the pid +long GetMemUsage(absl::optional pid = absl::nullopt); + struct MemStats { long rss; // Resident set size, in kb - static MemStats Snapshot() { - struct rusage usage; - if (0 != getrusage(RUSAGE_SELF, &usage)) abort(); - return MemStats{usage.ru_maxrss}; - } + static MemStats Snapshot() { return MemStats{GetMemUsage()}; } }; #endif diff --git a/test/core/util/subprocess.h b/test/core/util/subprocess.h index c7fe9af435d..5e52d6abad7 100644 --- a/test/core/util/subprocess.h +++ b/test/core/util/subprocess.h @@ -32,5 +32,6 @@ void gpr_subprocess_destroy(gpr_subprocess* p); /** returns exit status; can be called at most once */ int gpr_subprocess_join(gpr_subprocess* p); void gpr_subprocess_interrupt(gpr_subprocess* p); +int gpr_subprocess_get_process_id(gpr_subprocess* p); #endif /* GRPC_TEST_CORE_UTIL_SUBPROCESS_H */ diff --git a/test/core/util/subprocess_posix.cc b/test/core/util/subprocess_posix.cc index 3278add9e38..aea071cdb26 100644 --- a/test/core/util/subprocess_posix.cc +++ b/test/core/util/subprocess_posix.cc @@ -92,4 +92,6 @@ void gpr_subprocess_interrupt(gpr_subprocess* p) { } } +int gpr_subprocess_get_process_id(gpr_subprocess* p) { return p->pid; } + #endif /* GPR_POSIX_SUBPROCESS */ diff --git a/test/core/util/subprocess_windows.cc b/test/core/util/subprocess_windows.cc index 7d69af1368f..dd94cea35dc 100644 --- a/test/core/util/subprocess_windows.cc +++ b/test/core/util/subprocess_windows.cc @@ -124,4 +124,8 @@ void gpr_subprocess_interrupt(gpr_subprocess* p) { return; } +int gpr_subprocess_get_process_id(gpr_subprocess* p) { + return p->pi.dwProcessId; +} + #endif /* GPR_WINDOWS_SUBPROCESS */