Create multiple channels in per channel memory benchmark (#30441)

* Added new files for channel client/server

* Committing to switch branch

* Rebasing branch

* Switching branch

* Server process getting called

* Still working

* RPC received success, with sleep

* gRPC Receive success, grpc timeout

* Earlier but Clang tidy

* Fix timeout issue, remove some logs

* Added signint handler, test passing but flaky

* added sleep to reduce flakiness, removed some dependencies, changed LOG to gpr_log

* Changed benchmark_name default back to call

* remove deleted files

* grpc shutdown timeout

* trying to add shutdown

* Some changes

* Removed shutdown

* Automated change: Fix sanity tests

* Changes for review comments

* Changed comments

* Changed benchmark driver defaults so that CI testing would happen for all benchmarks

* Get server memory using RPC

* Add PID method to get memory

* Added gpr_subprocess_get_process_id to windows

* Removed GetAfterSnapshot since theres a not RPC method to get memory

* Automated change: Fix sanity tests

* Changed benchmark driver defaults so that CI testing would happen for all benchmarks

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* Forgot semicolon

* Fix includes

* Automated change: Fix sanity tests

* Added GetMemUsage and changed Snapshot and callback server and client to call it

* Moved GetMemUsage from header file

* removed some unnecessary includes

* Automated change: Fix sanity tests

* Updating build file

* forgot a comma

* Added tags to BUILD for memstats

* Automated change: Fix sanity tests

* Switching branches

* Changed channel creation and GetBeforeSnapshot

* Switching branches

* Intermediate commit

* Revert merge commit

* Fixed merge issues

* Automated change: Fix sanity tests

* Changes to fix asan failures

* Automated change: Fix sanity tests

* Fix comment

Co-authored-by: nancylucy01 <nancylucy01@users.noreply.github.com>
pull/30482/head
nancylucy01 2 years ago committed by GitHub
parent 4bc69cbe38
commit 2284be0cdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      test/core/memory_usage/BUILD
  2. 107
      test/core/memory_usage/callback_client.cc
  3. 1
      test/core/memory_usage/callback_server.cc
  4. 7
      test/core/memory_usage/memory_usage_test.cc

@ -91,6 +91,7 @@ grpc_cc_binary(
external_deps = [
"absl/flags:flag",
"absl/flags:parse",
"absl/time",
"gtest",
],
tags = [

@ -16,16 +16,23 @@
*
*/
#include <limits.h>
#include <chrono>
#include <memory>
#include <string>
#include <vector>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/notification.h"
#include "absl/utility/utility.h"
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/status.h>
#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
@ -36,8 +43,9 @@
ABSL_FLAG(std::string, target, "", "Target host:port");
ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
ABSL_FLAG(int, server_pid, 99999, "Server's pid");
ABSL_FLAG(int, size, 50, "Number of channels");
std::unique_ptr<grpc::testing::BenchmarkService::Stub> CreateStubForTest() {
std::shared_ptr<grpc::Channel> CreateChannelForTest(int index) {
// Set the authentication mechanism.
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
@ -46,58 +54,68 @@ std::unique_ptr<grpc::testing::BenchmarkService::Stub> CreateStubForTest() {
gpr_log(GPR_INFO, "Supposed to be secure, is not yet");
}
// Channel args to prevent connection from closing after RPC is done
grpc::ChannelArguments channel_args;
channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_IDLE_MS, INT_MAX);
channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_AGE_MS, INT_MAX);
// Arg to bypass mechanism that combines channels on the serverside if they
// have the same channel args. Allows for one channel per connection
channel_args.SetInt("grpc.memory_usage_counter", index);
// Create a channel to the server and a stub
std::shared_ptr<grpc::Channel> channel =
CreateChannel(absl::GetFlag(FLAGS_target), creds);
std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
grpc::testing::BenchmarkService::NewStub(channel);
return stub;
CreateCustomChannel(absl::GetFlag(FLAGS_target), creds, channel_args);
return channel;
}
void UnaryCall() {
struct CallParams {
grpc::ClientContext context;
grpc::testing::SimpleRequest request;
grpc::testing::SimpleResponse response;
grpc::testing::MemorySize snapshot_response;
absl::Notification done;
};
// Simple Unary RPC to send to confirm connection is open
std::shared_ptr<CallParams> UnaryCall(std::shared_ptr<grpc::Channel> channel) {
std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
CreateStubForTest();
grpc::testing::BenchmarkService::NewStub(channel);
// Start a call.
struct CallParams {
grpc::ClientContext context;
grpc::testing::SimpleRequest request;
grpc::testing::SimpleResponse response;
};
CallParams* params = new CallParams();
auto params = std::make_shared<CallParams>();
stub->async()->UnaryCall(&params->context, &params->request,
&params->response, [](const grpc::Status& status) {
if (status.ok()) {
gpr_log(GPR_INFO, "UnaryCall RPC succeeded.");
} else {
&params->response,
[params](const grpc::Status& status) {
if (!status.ok()) {
gpr_log(GPR_ERROR, "UnaryCall RPC failed.");
}
params->done.Notify();
});
return params;
}
// Get memory usage of server's process before the server is made
long GetBeforeSnapshot() {
std::shared_ptr<CallParams> GetBeforeSnapshot(
std::shared_ptr<grpc::Channel> channel, long& before_server_memory) {
std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
CreateStubForTest();
grpc::testing::BenchmarkService::NewStub(channel);
// Start a call.
struct CallParams {
grpc::ClientContext context;
grpc::testing::SimpleRequest request;
grpc::testing::MemorySize response;
};
CallParams* params = new CallParams();
auto params = std::make_shared<CallParams>();
stub->async()->GetBeforeSnapshot(
&params->context, &params->request, &params->response,
[params](const grpc::Status& status) {
&params->context, &params->request, &params->snapshot_response,
[params, &before_server_memory](const grpc::Status& status) {
if (status.ok()) {
gpr_log(GPR_INFO, "Before: %ld", params->response.rss());
before_server_memory = params->snapshot_response.rss();
gpr_log(GPR_INFO, "Server Before RPC: %ld",
params->snapshot_response.rss());
gpr_log(GPR_INFO, "GetBeforeSnapshot succeeded.");
} else {
gpr_log(GPR_ERROR, "GetBeforeSnapshot failed.");
}
params->done.Notify();
});
return params->response.rss();
return params;
}
int main(int argc, char** argv) {
@ -111,20 +129,45 @@ int main(int argc, char** argv) {
return 1;
}
gpr_log(GPR_INFO, "Client Target: %s", absl::GetFlag(FLAGS_target).c_str());
gpr_log(GPR_INFO, "Client Size: %d", absl::GetFlag(FLAGS_size));
// Getting initial memory usage
long before_server_memory = GetBeforeSnapshot();
std::shared_ptr<grpc::Channel> get_memory_channel = CreateChannelForTest(0);
long before_server_memory;
GetBeforeSnapshot(get_memory_channel, before_server_memory)
->done.WaitForNotification();
long before_client_memory = GetMemUsage();
UnaryCall();
// Create the channels and send an RPC to confirm they're open
int size = absl::GetFlag(FLAGS_size);
std::vector<std::shared_ptr<grpc::Channel>> channels_list(size);
for (int i = 0; i < size; ++i) {
std::shared_ptr<grpc::Channel> channel = CreateChannelForTest(i);
channels_list[i] = channel;
UnaryCall(channel)->done.WaitForNotification();
}
// Getting peak memory usage
long peak_server_memory = GetMemUsage(absl::GetFlag(FLAGS_server_pid));
long peak_client_memory = GetMemUsage();
gpr_log(GPR_INFO, "Before Server Mem: %ld", before_server_memory);
gpr_log(GPR_INFO, "Before Client Mem: %ld", before_client_memory);
gpr_log(GPR_INFO, "Peak Client Mem: %ld", peak_client_memory);
gpr_log(GPR_INFO, "Peak Server Mem: %ld", peak_server_memory);
gpr_log(GPR_INFO, "Peak Client Mem: %ld", peak_client_memory);
gpr_log(GPR_INFO, "Server Per Channel Memory: %f",
static_cast<float>(peak_server_memory - before_server_memory) /
static_cast<float>(size));
gpr_log(GPR_INFO, "Client Per Channel Memory: %f",
static_cast<float>(peak_client_memory - before_client_memory) /
static_cast<float>(size));
// Checking that all channels are still open
for (int i = 0; i < size; ++i) {
GPR_ASSERT(!absl::exchange(channels_list[i], nullptr)
->WaitForStateChange(GRPC_CHANNEL_READY,
std::chrono::system_clock::now() +
std::chrono::milliseconds(1)));
}
gpr_log(GPR_INFO, "Client Done");
return 0;
}

@ -47,7 +47,6 @@ class ServerCallbackImpl final
grpc::CallbackServerContext* context,
const grpc::testing::SimpleRequest* request,
grpc::testing::SimpleResponse* response) override {
gpr_log(GPR_INFO, "UnaryCall RPC CALL RECEIVED");
auto* reactor = context->DefaultReactor();
reactor->Finish(grpc::Status::OK);
return reactor;

@ -128,8 +128,11 @@ int RunChannelBenchmark(char* root) {
std::vector<std::string> client_flags = {
absl::StrCat(root, "/memory_usage_callback_client",
gpr_subprocess_binary_extension()),
"--target", grpc_core::JoinHostPort("127.0.0.1", port), "--nosecure",
absl::StrCat("--server_pid=", svr.GetPID())};
"--target",
grpc_core::JoinHostPort("127.0.0.1", port),
"--nosecure",
absl::StrCat("--server_pid=", svr.GetPID()),
absl::StrCat("--size=", absl::GetFlag(FLAGS_size))};
Subprocess cli(client_flags);
/* wait for completion */
if ((status = cli.Join()) != 0) {

Loading…
Cancel
Save