Fix to #30390 Adding methods to get memory usage of server for the perchannel benchmark (#30435)

* Revert "Revert "Adding methods to get memory usage of server for the perchannel benchmark (#30390)" (#30433)"

This reverts commit 3fe438b55a.

* Made all variables used
pull/30443/head
nancylucy01 3 years ago committed by GitHub
parent 585c00f6f4
commit 57e06148e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/proto/grpc/testing/benchmark_service.proto
  2. 4
      src/proto/grpc/testing/messages.proto
  3. 15
      test/core/memory_usage/BUILD
  4. 47
      test/core/memory_usage/callback_client.cc
  5. 26
      test/core/memory_usage/callback_server.cc
  6. 4
      test/core/memory_usage/memory_usage_test.cc
  7. 56
      test/core/memory_usage/memstats.cc
  8. 13
      test/core/memory_usage/memstats.h
  9. 1
      test/core/util/subprocess.h
  10. 2
      test/core/util/subprocess_posix.cc
  11. 4
      test/core/util/subprocess_windows.cc

@ -41,4 +41,7 @@ service BenchmarkService {
// Two-sided unbounded streaming between server to client // Two-sided unbounded streaming between server to client
// Both sides send the content of their own choice to the other // Both sides send the content of their own choice to the other
rpc StreamingBothWays(stream SimpleRequest) returns (stream SimpleResponse); rpc StreamingBothWays(stream SimpleRequest) returns (stream SimpleResponse);
//Get the memory usage of process before server is made
rpc GetBeforeSnapshot(SimpleRequest) returns (MemorySize);
} }

@ -268,3 +268,7 @@ message ClientConfigureRequest {
// Response for updating a test client's configuration. // Response for updating a test client's configuration.
message ClientConfigureResponse {} message ClientConfigureResponse {}
message MemorySize {
int64 rss = 1;
}

@ -21,7 +21,22 @@ licenses(["notice"])
grpc_cc_library( grpc_cc_library(
name = "memstats", name = "memstats",
testonly = 1, testonly = 1,
srcs = [
"memstats.cc",
"memstats.h",
],
hdrs = ["memstats.h"], hdrs = ["memstats.h"],
external_deps = [
"absl/types:optional",
],
tags = [
"bazel_only",
"no_mac",
"no_windows",
],
deps = [
"//:gpr",
],
) )
grpc_cc_binary( grpc_cc_binary(

@ -30,12 +30,14 @@
#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h" #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/messages.pb.h"
#include "test/core/memory_usage/memstats.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
ABSL_FLAG(std::string, target, "", "Target host:port"); ABSL_FLAG(std::string, target, "", "Target host:port");
ABSL_FLAG(bool, secure, false, "Use SSL Credentials"); ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
ABSL_FLAG(int, server_pid, 99999, "Server's pid");
void UnaryCall() { std::unique_ptr<grpc::testing::BenchmarkService::Stub> CreateStubForTest() {
// Set the authentication mechanism. // Set the authentication mechanism.
std::shared_ptr<grpc::ChannelCredentials> creds = std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials(); grpc::InsecureChannelCredentials();
@ -49,6 +51,12 @@ void UnaryCall() {
CreateChannel(absl::GetFlag(FLAGS_target), creds); CreateChannel(absl::GetFlag(FLAGS_target), creds);
std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub = std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
grpc::testing::BenchmarkService::NewStub(channel); grpc::testing::BenchmarkService::NewStub(channel);
return stub;
}
void UnaryCall() {
std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
CreateStubForTest();
// Start a call. // Start a call.
struct CallParams { struct CallParams {
@ -67,6 +75,31 @@ void UnaryCall() {
}); });
} }
// Get memory usage of server's process before the server is made
long GetBeforeSnapshot() {
std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
CreateStubForTest();
// Start a call.
struct CallParams {
grpc::ClientContext context;
grpc::testing::SimpleRequest request;
grpc::testing::MemorySize response;
};
CallParams* params = new CallParams();
stub->async()->GetBeforeSnapshot(
&params->context, &params->request, &params->response,
[params](const grpc::Status& status) {
if (status.ok()) {
gpr_log(GPR_INFO, "Before: %ld", params->response.rss());
gpr_log(GPR_INFO, "GetBeforeSnapshot succeeded.");
} else {
gpr_log(GPR_ERROR, "GetBeforeSnapshot failed.");
}
});
return params->response.rss();
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv); absl::ParseCommandLine(argc, argv);
char* fake_argv[1]; char* fake_argv[1];
@ -79,7 +112,19 @@ int main(int argc, char** argv) {
} }
gpr_log(GPR_INFO, "Client Target: %s", absl::GetFlag(FLAGS_target).c_str()); gpr_log(GPR_INFO, "Client Target: %s", absl::GetFlag(FLAGS_target).c_str());
// Getting initial memory usage
long before_server_memory = GetBeforeSnapshot();
long before_client_memory = GetMemUsage();
UnaryCall(); UnaryCall();
// Getting peak memory usage
long peak_server_memory = GetMemUsage(absl::GetFlag(FLAGS_server_pid));
long peak_client_memory = GetMemUsage();
gpr_log(GPR_INFO, "Before Server Mem: %ld", before_server_memory);
gpr_log(GPR_INFO, "Before Client Mem: %ld", before_client_memory);
gpr_log(GPR_INFO, "Peak Client Mem: %ld", peak_client_memory);
gpr_log(GPR_INFO, "Peak Server Mem: %ld", peak_server_memory);
gpr_log(GPR_INFO, "Client Done"); gpr_log(GPR_INFO, "Client Done");
return 0; return 0;
} }

@ -34,19 +34,37 @@
#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h" #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
#include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/messages.pb.h"
#include "test/core/memory_usage/memstats.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
class ServerCallbackImpl final class ServerCallbackImpl final
: public grpc::testing::BenchmarkService::CallbackService { : public grpc::testing::BenchmarkService::CallbackService {
public:
explicit ServerCallbackImpl(long before_server_memory)
: before_server_create(before_server_memory) {}
grpc::ServerUnaryReactor* UnaryCall( grpc::ServerUnaryReactor* UnaryCall(
grpc::CallbackServerContext* context, grpc::CallbackServerContext* context,
const grpc::testing::SimpleRequest* request, const grpc::testing::SimpleRequest* request,
grpc::testing::SimpleResponse* response) override { grpc::testing::SimpleResponse* response) override {
gpr_log(GPR_INFO, "RPC CALL RECEIVED"); gpr_log(GPR_INFO, "UnaryCall RPC CALL RECEIVED");
auto* reactor = context->DefaultReactor(); auto* reactor = context->DefaultReactor();
reactor->Finish(grpc::Status::OK); reactor->Finish(grpc::Status::OK);
return reactor; return reactor;
} }
grpc::ServerUnaryReactor* GetBeforeSnapshot(
grpc::CallbackServerContext* context,
const grpc::testing::SimpleRequest* request,
grpc::testing::MemorySize* response) override {
gpr_log(GPR_INFO, "BeforeSnapshot RPC CALL RECEIVED");
response->set_rss(before_server_create);
auto* reactor = context->DefaultReactor();
reactor->Finish(grpc::Status::OK);
return reactor;
}
private:
long before_server_create;
}; };
/* We have some sort of deadlock, so let's not exit gracefully for now. /* We have some sort of deadlock, so let's not exit gracefully for now.
@ -71,8 +89,12 @@ int main(int argc, char** argv) {
} }
gpr_log(GPR_INFO, "Server port: %s", server_address.c_str()); gpr_log(GPR_INFO, "Server port: %s", server_address.c_str());
ServerCallbackImpl callback_server; // Get initial process memory usage before creating server
long before_server_create = GetMemUsage();
gpr_log(GPR_INFO, "Server Before Mem: %ld", before_server_create);
ServerCallbackImpl callback_server(before_server_create);
grpc::ServerBuilder builder; grpc::ServerBuilder builder;
// Set the authentication mechanism. // Set the authentication mechanism.
std::shared_ptr<grpc::ServerCredentials> creds = std::shared_ptr<grpc::ServerCredentials> creds =
grpc::InsecureServerCredentials(); grpc::InsecureServerCredentials();

@ -62,6 +62,7 @@ class Subprocess {
process_ = gpr_subprocess_create(args_c.size(), args_c.data()); process_ = gpr_subprocess_create(args_c.size(), args_c.data());
} }
int GetPID() { return gpr_subprocess_get_process_id(process_); }
int Join() { return gpr_subprocess_join(process_); } int Join() { return gpr_subprocess_join(process_); }
void Interrupt() { gpr_subprocess_interrupt(process_); } void Interrupt() { gpr_subprocess_interrupt(process_); }
@ -127,7 +128,8 @@ int RunChannelBenchmark(char* root) {
std::vector<std::string> client_flags = { std::vector<std::string> client_flags = {
absl::StrCat(root, "/memory_usage_callback_client", absl::StrCat(root, "/memory_usage_callback_client",
gpr_subprocess_binary_extension()), gpr_subprocess_binary_extension()),
"--target", grpc_core::JoinHostPort("127.0.0.1", port), "--nosecure"}; "--target", grpc_core::JoinHostPort("127.0.0.1", port), "--nosecure",
absl::StrCat("--server_pid=", svr.GetPID())};
Subprocess cli(client_flags); Subprocess cli(client_flags);
/* wait for completion */ /* wait for completion */
if ((status = cli.Join()) != 0) { if ((status = cli.Join()) != 0) {

@ -0,0 +1,56 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/memory_usage/memstats.h"
#include <unistd.h>
#include <fstream>
#include <string>
#include "absl/strings/str_cat.h"
#include <grpc/support/log.h>
long GetMemUsage(absl::optional<int> pid) {
// Default is getting memory usage for self (calling process)
std::string path = "/proc/self/stat";
if (pid != absl::nullopt) {
path = absl::StrCat("/proc/", pid.value(), "/stat");
}
std::ifstream stat_stream(path, std::ios_base::in);
double resident_set = 0.0;
// Temporary variables for irrelevant leading entries in stats
std::string temp_pid, comm, state, ppid, pgrp, session, tty_nr;
std::string tpgid, flags, minflt, cminflt, majflt, cmajflt;
std::string utime, stime, cutime, cstime, priority, nice;
std::string O, itrealvalue, starttime, vsize;
// Get rss to find memory usage
long rss;
stat_stream >> temp_pid >> comm >> state >> ppid >> pgrp >> session >>
tty_nr >> tpgid >> flags >> minflt >> cminflt >> majflt >> cmajflt >>
utime >> stime >> cutime >> cstime >> priority >> nice >> O >>
itrealvalue >> starttime >> vsize >> rss;
stat_stream.close();
// pid does not connect to an existing process
GPR_ASSERT(!state.empty());
// Calculations in case x86-64 is configured to use 2MB pages
long page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024;
resident_set = rss * page_size_kb;
return resident_set;
}

@ -15,18 +15,17 @@
#ifndef TEST_H #ifndef TEST_H
#define TEST_H #define TEST_H
#include <stdlib.h> #include "absl/types/optional.h"
#include <sys/resource.h>
// IWYU pragma: no_include <bits/types/struct_rusage.h> // IWYU pragma: no_include <bits/types/struct_rusage.h>
// Get the memory usage of either the calling process or another process using
// the pid
long GetMemUsage(absl::optional<int> pid = absl::nullopt);
struct MemStats { struct MemStats {
long rss; // Resident set size, in kb long rss; // Resident set size, in kb
static MemStats Snapshot() { static MemStats Snapshot() { return MemStats{GetMemUsage()}; }
struct rusage usage;
if (0 != getrusage(RUSAGE_SELF, &usage)) abort();
return MemStats{usage.ru_maxrss};
}
}; };
#endif #endif

@ -32,5 +32,6 @@ void gpr_subprocess_destroy(gpr_subprocess* p);
/** returns exit status; can be called at most once */ /** returns exit status; can be called at most once */
int gpr_subprocess_join(gpr_subprocess* p); int gpr_subprocess_join(gpr_subprocess* p);
void gpr_subprocess_interrupt(gpr_subprocess* p); void gpr_subprocess_interrupt(gpr_subprocess* p);
int gpr_subprocess_get_process_id(gpr_subprocess* p);
#endif /* GRPC_TEST_CORE_UTIL_SUBPROCESS_H */ #endif /* GRPC_TEST_CORE_UTIL_SUBPROCESS_H */

@ -92,4 +92,6 @@ void gpr_subprocess_interrupt(gpr_subprocess* p) {
} }
} }
int gpr_subprocess_get_process_id(gpr_subprocess* p) { return p->pid; }
#endif /* GPR_POSIX_SUBPROCESS */ #endif /* GPR_POSIX_SUBPROCESS */

@ -124,4 +124,8 @@ void gpr_subprocess_interrupt(gpr_subprocess* p) {
return; return;
} }
int gpr_subprocess_get_process_id(gpr_subprocess* p) {
return p->pi.dwProcessId;
}
#endif /* GPR_WINDOWS_SUBPROCESS */ #endif /* GPR_WINDOWS_SUBPROCESS */

Loading…
Cancel
Save