From 1d0fce9d60494936aa0e79c6b262d113b5b1acd5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 26 Oct 2016 08:26:22 -0700 Subject: [PATCH] Add some endpoint pair tests --- include/grpc++/support/channel_arguments.h | 7 ++ src/core/lib/surface/server.c | 11 +- test/cpp/microbenchmarks/bm_fullstack.cc | 131 +++++++++++++++++++-- 3 files changed, 138 insertions(+), 11 deletions(-) diff --git a/include/grpc++/support/channel_arguments.h b/include/grpc++/support/channel_arguments.h index ae243939e9a..79a76d02ea6 100644 --- a/include/grpc++/support/channel_arguments.h +++ b/include/grpc++/support/channel_arguments.h @@ -91,6 +91,13 @@ class ChannelArguments { /// Set a textual argument \a value under \a key. void SetString(const grpc::string& key, const grpc::string& value); + grpc_channel_args c_args() { + grpc_channel_args out; + out.num_args = args_.size(); + out.args = args_.empty() ? NULL : &args_[0]; + return out; + } + private: friend class SecureChannelCredentials; friend class testing::ChannelArgumentsTest; diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 3a90308058c..648c23580a6 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -195,6 +195,7 @@ struct grpc_server { grpc_completion_queue **cqs; grpc_pollset **pollsets; size_t cq_count; + size_t pollset_count; bool started; /* The two following mutexes control access to server-state @@ -1084,7 +1085,7 @@ void grpc_server_start(grpc_server *server) { GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server)); server->started = true; - size_t pollset_count = 0; + server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); server->request_freelist_per_cq = gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); @@ -1092,7 +1093,8 @@ void grpc_server_start(grpc_server *server) { gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { - server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]); + server->pollsets[server->pollset_count++] = + grpc_cq_pollset(server->cqs[i]); } server->request_freelist_per_cq[i] = gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); @@ -1111,7 +1113,8 @@ void grpc_server_start(grpc_server *server) { } for (l = server->listeners; l; l = l->next) { - l->start(&exec_ctx, server, l->arg, server->pollsets, pollset_count); + l->start(&exec_ctx, server, l->arg, server->pollsets, + server->pollset_count); } grpc_exec_ctx_finish(&exec_ctx); @@ -1119,7 +1122,7 @@ void grpc_server_start(grpc_server *server) { void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets, size_t *pollset_count) { - *pollset_count = server->cq_count; + *pollset_count = server->pollset_count; *pollsets = server->pollsets; } diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc index 33f2bf63468..11025caa44c 100644 --- a/test/cpp/microbenchmarks/bm_fullstack.cc +++ b/test/cpp/microbenchmarks/bm_fullstack.cc @@ -37,39 +37,133 @@ #include #include +#include #include #include #include #include #include -#include "src/proto/grpc/testing/echo.grpc.pb.h" +extern "C" { +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/endpoint_pair.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/tcp_posix.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/server.h" +#include "test/core/util/passthru_endpoint.h" #include "test/core/util/port.h" +} +#include "src/cpp/client/create_channel_internal.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" #include "third_party/google_benchmark/include/benchmark/benchmark.h" namespace grpc { namespace testing { +static class InitializeStuff { + public: + InitializeStuff() { init_lib.init(); } + + private: + internal::GrpcLibrary init_lib; +} initialize_stuff; + /******************************************************************************* * FIXTURES */ -class TCPFixture { +class FullstackFixture { public: - TCPFixture(EchoTestService::AsyncService* service) { + FullstackFixture(Service* service, const grpc::string& address) { ServerBuilder b; + b.AddListeningPort(address, InsecureServerCredentials()); + cq_ = b.AddCompletionQueue(true); + b.RegisterService(service); + server_ = b.BuildAndStart(); + channel_ = CreateChannel(address, InsecureChannelCredentials()); + } + + ServerCompletionQueue* cq() { return cq_.get(); } + std::shared_ptr channel() { return channel_; } + + private: + std::unique_ptr server_; + std::unique_ptr cq_; + std::shared_ptr channel_; +}; + +class TCP : public FullstackFixture { + public: + TCP(Service* service) : FullstackFixture(service, MakeAddress()) {} + + private: + static grpc::string MakeAddress() { int port = grpc_pick_unused_port_or_die(); std::stringstream addr; addr << "localhost:" << port; - b.AddListeningPort(addr.str(), InsecureServerCredentials()); + return addr.str(); + } +}; + +class UDS : public FullstackFixture { + public: + UDS(Service* service) : FullstackFixture(service, "unix:bm_fullstack") {} +}; + +class EndpointPairFixture { + public: + EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints) { + ServerBuilder b; cq_ = b.AddCompletionQueue(true); b.RegisterService(service); server_ = b.BuildAndStart(); - channel_ = CreateChannel(addr.str(), InsecureChannelCredentials()); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + /* add server endpoint to server_ */ + { + const grpc_channel_args* server_args = + grpc_server_get_channel_args(server_->c_server()); + grpc_transport* transport = grpc_create_chttp2_transport( + &exec_ctx, server_args, endpoints.server, 0 /* is_client */); + + grpc_pollset** pollsets; + size_t num_pollsets = 0; + grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets); + + for (size_t i = 0; i < num_pollsets; i++) { + grpc_endpoint_add_to_pollset(&exec_ctx, endpoints.server, pollsets[i]); + } + + grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport, + NULL, server_args); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + } + + /* create channel */ + { + ChannelArguments args; + args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); + + grpc_channel_args c_args = args.c_args(); + grpc_transport* transport = + grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1); + GPR_ASSERT(transport); + grpc_channel* channel = grpc_channel_create( + &exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); + + channel_ = CreateChannelInternal("", channel); + } + + grpc_exec_ctx_finish(&exec_ctx); } ServerCompletionQueue* cq() { return cq_.get(); } - std::shared_ptr channel() { return channel_; } private: @@ -78,6 +172,26 @@ class TCPFixture { std::shared_ptr channel_; }; +class SockPair : public EndpointPairFixture { + public: + SockPair(Service* service) + : EndpointPairFixture(service, + grpc_iomgr_create_endpoint_pair("test", 8192)) {} +}; + +class InProcessCHTTP2 : public EndpointPairFixture { + public: + InProcessCHTTP2(Service* service) + : EndpointPairFixture(service, MakeEndpoints()) {} + + private: + grpc_endpoint_pair MakeEndpoints() { + grpc_endpoint_pair p; + grpc_passthru_endpoint_create(&p.client, &p.server); + return p; + } +}; + /******************************************************************************* * BENCHMARKING KERNELS */ @@ -145,7 +259,10 @@ static void BM_UnaryPingPong(benchmark::State& state) { * CONFIGURATIONS */ -BENCHMARK_TEMPLATE(BM_UnaryPingPong, TCPFixture); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, TCP); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, UDS); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, SockPair); +BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2); } // namespace testing } // namespace grpc