Basic callback client and server for new perchannel benchmark (#30305)

* Added new files for channel client/server

* Committing to switch branch

* Rebasing branch

* Switching branch

* Server process getting called

* Still working

* RPC received success, with sleep

* gRPC Receive success, grpc timeout

* Earlier but Clang tidy

* Fix timeout issue, remove some logs

* Added signint handler, test passing but flaky

* added sleep to reduce flakiness, removed some dependencies, changed LOG to gpr_log

* Changed benchmark_name default back to call

* remove deleted files

* grpc shutdown timeout

* trying to add shutdown

* Some changes

* Removed shutdown

* Automated change: Fix sanity tests

* Changes for review comments

* Changed comments

* Changed benchmark driver defaults so that CI testing would happen for all benchmarks

* Automated change: Fix sanity tests

Co-authored-by: nancylucy01 <nancylucy01@users.noreply.github.com>
pull/29936/head^2
nancylucy01 2 years ago committed by GitHub
parent cacbd74f5d
commit 3cd368b28d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 52
      test/core/memory_usage/BUILD
  2. 106
      test/core/memory_usage/callback_client.cc
  3. 107
      test/core/memory_usage/callback_server.cc
  4. 133
      test/core/memory_usage/memory_usage_test.cc
  5. 2
      tools/profiling/memory/memory_diff.py

@ -67,10 +67,62 @@ grpc_cc_binary(
],
)
grpc_cc_binary(
name = "memory_usage_callback_client",
testonly = 1,
srcs = ["callback_client.cc"],
external_deps = [
"absl/flags:flag",
"absl/flags:parse",
"gtest",
],
tags = [
"bazel_only",
"no_mac",
"no_windows",
],
deps = [
":memstats",
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/proto/grpc/testing:benchmark_service_proto",
"//test/core/end2end:ssl_test_data",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_binary(
name = "memory_usage_callback_server",
testonly = 1,
srcs = ["callback_server.cc"],
external_deps = [
"absl/flags:flag",
"absl/flags:parse",
"gtest",
],
tags = [
"bazel_only",
"no_mac",
"no_windows",
],
deps = [
":memstats",
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/proto/grpc/testing:benchmark_service_proto",
"//test/core/end2end:ssl_test_data",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "memory_usage_test",
srcs = ["memory_usage_test.cc"],
data = [
":memory_usage_callback_client",
":memory_usage_callback_server",
":memory_usage_client",
":memory_usage_server",
],

@ -0,0 +1,106 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdio.h>
#include <string.h>
#include <gtest/gtest.h>
#include "absl/algorithm/container.h"
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "util/logging.h"
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/client_callback.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/cpp/client/secure_credentials.h"
#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
#include "test/core/memory_usage/memstats.h"
#include "test/core/util/port.h"
#include "test/core/util/subprocess.h"
#include "test/core/util/test_config.h"
ABSL_FLAG(std::string, target, "", "Target host:port");
ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
void UnaryCall() {
// Set the authentication mechanism.
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
if (absl::GetFlag(FLAGS_secure)) {
// TODO (chennancy) Add in secure credentials
gpr_log(GPR_INFO, "Supposed to be secure, is not yet");
}
// Create a channel to the server and a stub
std::shared_ptr<grpc::Channel> channel =
CreateChannel(absl::GetFlag(FLAGS_target), creds);
std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
grpc::testing::BenchmarkService::NewStub(channel);
// Start a call.
struct CallParams {
grpc::ClientContext context;
grpc::testing::SimpleRequest request;
grpc::testing::SimpleResponse response;
};
CallParams* params = new CallParams();
stub->async()->UnaryCall(&params->context, &params->request,
&params->response, [](const grpc::Status& status) {
if (status.ok()) {
gpr_log(GPR_INFO, "UnaryCall RPC succeeded.");
} else {
gpr_log(GPR_ERROR, "UnaryCall RPC failed.");
}
});
}
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
char* fake_argv[1];
GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc::testing::TestEnvironment env(&argc, argv);
if (absl::GetFlag(FLAGS_target).empty()) {
gpr_log(GPR_ERROR, "Client: No target port entered");
return 1;
}
gpr_log(GPR_INFO, "Client Target: %s", absl::GetFlag(FLAGS_target).c_str());
UnaryCall();
gpr_log(GPR_INFO, "Client Done");
return 0;
}

@ -0,0 +1,107 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdio.h>
#include <string.h>
#include <thread>
#include <gtest/gtest.h>
#include "absl/algorithm/container.h"
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "util/logging.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/client_callback.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/cpp/client/secure_credentials.h"
#include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
#include "test/core/memory_usage/memstats.h"
#include "test/core/util/port.h"
#include "test/core/util/subprocess.h"
#include "test/core/util/test_config.h"
class ServerCallbackImpl final
: public grpc::testing::BenchmarkService::CallbackService {
grpc::ServerUnaryReactor* UnaryCall(
grpc::CallbackServerContext* context,
const grpc::testing::SimpleRequest* request,
grpc::testing::SimpleResponse* response) override {
gpr_log(GPR_INFO, "RPC CALL RECEIVED");
auto* reactor = context->DefaultReactor();
reactor->Finish(grpc::Status::OK);
return reactor;
}
};
/* We have some sort of deadlock, so let's not exit gracefully for now.
TODO(chennancy) Add graceful shutdown */
static void sigint_handler(int /*x*/) { _exit(0); }
ABSL_FLAG(std::string, bind, "", "Bind host:port");
ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
char* fake_argv[1];
GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
signal(SIGINT, sigint_handler);
std::string server_address = absl::GetFlag(FLAGS_bind);
if (server_address.empty()) {
gpr_log(GPR_ERROR, "Server: No port entered");
return 1;
}
gpr_log(GPR_INFO, "Server port: %s", server_address.c_str());
ServerCallbackImpl callback_server;
grpc::ServerBuilder builder;
// Set the authentication mechanism.
std::shared_ptr<grpc::ServerCredentials> creds =
grpc::InsecureServerCredentials();
if (absl::GetFlag(FLAGS_secure)) {
gpr_log(GPR_INFO, "Supposed to be secure, is not yet");
// TODO (chennancy) Add in secure credentials
}
builder.AddListeningPort(server_address, creds);
builder.RegisterService(&callback_server);
// Set up the server to start accepting requests.
std::shared_ptr<grpc::Server> server(builder.BuildAndStart());
gpr_log(GPR_INFO, "Server listening on %s", server_address.c_str());
// Keep the program running until the server shuts down.
server->Wait();
return 0;
}

@ -25,18 +25,24 @@
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "util/logging.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/host_port.h"
#include "test/core/util/port.h"
#include "test/core/util/subprocess.h"
#include "test/core/util/test_config.h"
ABSL_FLAG(std::string, benchmark_name, "call", "Which benchmark to run");
ABSL_FLAG(int, size, 50000, "Number of channels/calls");
ABSL_FLAG(std::string, benchmark_names, "call,channel",
"Which benchmark to run"); // Default all benchmarks in order to
// trigger CI testing for each one
ABSL_FLAG(int, size, 1000, "Number of channels/calls");
ABSL_FLAG(std::string, scenario_config, "insecure",
"Possible Values: minstack (Use minimal stack), resource_quota, "
"secure (Use SSL credentials on server)");
@ -63,15 +69,94 @@ class Subprocess {
gpr_subprocess* process_;
};
/* per-call memory usage benchmark */
int RunCallBenchmark(char* root, std::vector<std::string> server_scenario_flags,
std::vector<std::string> client_scenario_flags) {
int status;
int port = grpc_pick_unused_port_or_die();
/* start the server */
std::vector<std::string> server_flags = {
absl::StrCat(root, "/memory_usage_server",
gpr_subprocess_binary_extension()),
"--bind", grpc_core::JoinHostPort("::", port)};
// Add scenario-specific server flags to the end of the server_flags
absl::c_move(server_scenario_flags, std::back_inserter(server_flags));
Subprocess svr(server_flags);
/* start the client */
std::vector<std::string> client_flags = {
absl::StrCat(root, "/memory_usage_client",
gpr_subprocess_binary_extension()),
"--target", grpc_core::JoinHostPort("127.0.0.1", port),
absl::StrCat("--warmup=", 10000),
absl::StrCat("--benchmark=", absl::GetFlag(FLAGS_size))};
// Add scenario-specific client flags to the end of the client_flags
absl::c_move(client_scenario_flags, std::back_inserter(client_flags));
Subprocess cli(client_flags);
/* wait for completion */
if ((status = cli.Join()) != 0) {
printf("client failed with: %d", status);
return 1;
}
svr.Interrupt();
return svr.Join() == 0 ? 0 : 2;
}
/* Per-channel benchmark*/
int RunChannelBenchmark(char* root) {
// TODO(chennancy) Add the scenario specific flags
int status;
int port = grpc_pick_unused_port_or_die();
/* start the server */
std::vector<std::string> server_flags = {
absl::StrCat(root, "/memory_usage_callback_server",
gpr_subprocess_binary_extension()),
"--bind", grpc_core::JoinHostPort("::", port)};
Subprocess svr(server_flags);
// Wait one second before starting client to avoid possible race condition
// of client sending an RPC before the server is set up
gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
/* start the client */
std::vector<std::string> client_flags = {
absl::StrCat(root, "/memory_usage_callback_client",
gpr_subprocess_binary_extension()),
"--target", grpc_core::JoinHostPort("127.0.0.1", port), "--nosecure"};
Subprocess cli(client_flags);
/* wait for completion */
if ((status = cli.Join()) != 0) {
printf("client failed with: %d", status);
return 1;
}
svr.Interrupt();
return svr.Join() == 0 ? 0 : 2;
}
int RunBenchmark(char* root, absl::string_view benchmark,
std::vector<std::string> server_scenario_flags,
std::vector<std::string> client_scenario_flags) {
if (benchmark == "call") {
return RunCallBenchmark(root, server_scenario_flags, client_scenario_flags);
} else if (benchmark == "channel") {
return RunChannelBenchmark(root);
} else {
gpr_log(GPR_INFO, "Not a valid benchmark name");
return 4;
}
}
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
char* me = argv[0];
char* lslash = strrchr(me, '/');
char root[1024];
int port = grpc_pick_unused_port_or_die();
std::vector<const char*> args;
int status;
/* figure out where we are */
if (lslash) {
memcpy(root, me, static_cast<size_t>(lslash - me));
@ -98,37 +183,13 @@ int main(int argc, char** argv) {
return 3;
}
/* per-call memory usage benchmark */
if (absl::GetFlag(FLAGS_benchmark_name) == "call") {
/* start the server */
std::vector<std::string> server_flags = {
absl::StrCat(root, "/memory_usage_server",
gpr_subprocess_binary_extension()),
"--bind", grpc_core::JoinHostPort("::", port)};
// Add scenario-specific server flags to the end of the server_flags
absl::c_move(it_scenario->second.server, std::back_inserter(server_flags));
Subprocess svr(server_flags);
/* start the client */
std::vector<std::string> client_flags = {
absl::StrCat(root, "/memory_usage_client",
gpr_subprocess_binary_extension()),
"--target", grpc_core::JoinHostPort("127.0.0.1", port),
absl::StrCat("--warmup=", 10000),
absl::StrCat("--benchmark=", absl::GetFlag(FLAGS_size))};
// Add scenario-specific client flags to the end of the client_flags
absl::c_move(it_scenario->second.client, std::back_inserter(client_flags));
Subprocess cli(client_flags);
/* wait for completion */
if ((status = cli.Join()) != 0) {
printf("client failed with: %d", status);
return 1;
}
svr.Interrupt();
return svr.Join() == 0 ? 0 : 2;
// Run all benchmarks listed (Multiple benchmarks usually only for default
// scenario)
auto benchmarks = absl::StrSplit(absl::GetFlag(FLAGS_benchmark_names), ',');
for (const auto& benchmark : benchmarks) {
int r = RunBenchmark(root, benchmark, it_scenario->second.server,
it_scenario->second.client);
if (r != 0) return r;
}
printf("Command line args couldn't be parsed\n");
return 4;
return 0;
}

@ -66,6 +66,8 @@ def _run():
try:
output = subprocess.check_output([
'bazel-bin/test/core/memory_usage/memory_usage_test',
'--benchmark_names=call',
'--size=50000',
] + extra_args)
except subprocess.CalledProcessError as e:
print('Error running benchmark:', e)

Loading…
Cancel
Save