Merge pull request #9487 from ctiller/bm_stream

One-way streaming microbenchmarks
pull/9580/head
Sree Kuchibhotla 8 years ago committed by GitHub
commit fbe0cf27f1
  1. 2
      test/core/end2end/fuzzers/api_fuzzer.c
  2. 10
      test/core/util/passthru_endpoint.c
  3. 5
      test/core/util/passthru_endpoint.h
  4. 150
      test/cpp/microbenchmarks/bm_fullstack.cc

@ -442,7 +442,7 @@ static void do_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
} else if (g_server != NULL) {
grpc_endpoint *client;
grpc_endpoint *server;
grpc_passthru_endpoint_create(&client, &server, g_resource_quota);
grpc_passthru_endpoint_create(&client, &server, g_resource_quota, NULL);
*fc->ep = client;
grpc_transport *transport =

@ -34,6 +34,7 @@
#include "test/core/util/passthru_endpoint.h"
#include <inttypes.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
@ -55,6 +56,9 @@ typedef struct {
struct passthru_endpoint {
gpr_mu mu;
int halves;
grpc_passthru_endpoint_stats *stats;
grpc_passthru_endpoint_stats
dummy_stats; // used if constructor stats == NULL
bool shutdown;
half client;
half server;
@ -86,6 +90,7 @@ static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
half *m = other_half((half *)ep);
gpr_mu_lock(&m->parent->mu);
grpc_error *error = GRPC_ERROR_NONE;
m->parent->stats->num_writes++;
if (m->parent->shutdown) {
error = GRPC_ERROR_CREATE("Endpoint already shutdown");
} else if (m->on_read != NULL) {
@ -188,10 +193,13 @@ static void half_init(half *m, passthru_endpoint *parent,
void grpc_passthru_endpoint_create(grpc_endpoint **client,
grpc_endpoint **server,
grpc_resource_quota *resource_quota) {
grpc_resource_quota *resource_quota,
grpc_passthru_endpoint_stats *stats) {
passthru_endpoint *m = gpr_malloc(sizeof(*m));
m->halves = 2;
m->shutdown = 0;
m->stats = stats == NULL ? &m->dummy_stats : stats;
memset(m->stats, 0, sizeof(*m->stats));
half_init(&m->client, m, resource_quota, "client");
half_init(&m->server, m, resource_quota, "server");
gpr_mu_init(&m->mu);

@ -36,8 +36,11 @@
#include "src/core/lib/iomgr/endpoint.h"
typedef struct { int num_writes; } grpc_passthru_endpoint_stats;
void grpc_passthru_endpoint_create(grpc_endpoint **client,
grpc_endpoint **server,
grpc_resource_quota *resource_quota);
grpc_resource_quota *resource_quota,
grpc_passthru_endpoint_stats *stats);
#endif

@ -130,6 +130,8 @@ class TCP : public FullstackFixture {
public:
TCP(Service* service) : FullstackFixture(service, MakeAddress()) {}
void Finish(benchmark::State& state) {}
private:
static grpc::string MakeAddress() {
int port = grpc_pick_unused_port_or_die();
@ -143,6 +145,8 @@ class UDS : public FullstackFixture {
public:
UDS(Service* service) : FullstackFixture(service, MakeAddress()) {}
void Finish(benchmark::State& state) {}
private:
static grpc::string MakeAddress() {
int port = grpc_pick_unused_port_or_die(); // just for a unique id - not a
@ -228,6 +232,8 @@ class SockPair : public EndpointPairFixture {
: EndpointPairFixture(service, grpc_iomgr_create_endpoint_pair(
"test", initialize_stuff.rq(), 8192)) {
}
void Finish(benchmark::State& state) {}
};
class InProcessCHTTP2 : public EndpointPairFixture {
@ -235,10 +241,20 @@ class InProcessCHTTP2 : public EndpointPairFixture {
InProcessCHTTP2(Service* service)
: EndpointPairFixture(service, MakeEndpoints()) {}
void Finish(benchmark::State& state) {
std::ostringstream out;
out << "writes/iteration:"
<< ((double)stats_.num_writes / (double)state.iterations());
state.SetLabel(out.str());
}
private:
grpc_passthru_endpoint_stats stats_;
grpc_endpoint_pair MakeEndpoints() {
grpc_endpoint_pair p;
grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq());
grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(),
&stats_);
return p;
}
};
@ -415,6 +431,7 @@ static void BM_UnaryPingPong(benchmark::State& state) {
service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
fixture->cq(), fixture->cq(), tag(slot));
}
fixture->Finish(state);
fixture.reset();
server_env[0]->~ServerEnv();
server_env[1]->~ServerEnv();
@ -422,6 +439,120 @@ static void BM_UnaryPingPong(benchmark::State& state) {
state.range(1) * state.iterations());
}
template <class Fixture>
static void BM_PumpStreamClientToServer(benchmark::State& state) {
EchoTestService::AsyncService service;
std::unique_ptr<Fixture> fixture(new Fixture(&service));
{
EchoRequest send_request;
EchoRequest recv_request;
if (state.range(0) > 0) {
send_request.set_message(std::string(state.range(0), 'a'));
}
Status recv_status;
ServerContext svr_ctx;
ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
fixture->cq(), tag(0));
std::unique_ptr<EchoTestService::Stub> stub(
EchoTestService::NewStub(fixture->channel()));
ClientContext cli_ctx;
auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
int need_tags = (1 << 0) | (1 << 1);
void* t;
bool ok;
while (need_tags) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
GPR_ASSERT(ok);
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
response_rw.Read(&recv_request, tag(0));
while (state.KeepRunning()) {
request_rw->Write(send_request, tag(1));
while (true) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
if (t == tag(0)) {
response_rw.Read(&recv_request, tag(0));
} else if (t == tag(1)) {
break;
} else {
GPR_ASSERT(false);
}
}
}
request_rw->WritesDone(tag(1));
need_tags = (1 << 0) | (1 << 1);
while (need_tags) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
}
fixture->Finish(state);
fixture.reset();
state.SetBytesProcessed(state.range(0) * state.iterations());
}
template <class Fixture>
static void BM_PumpStreamServerToClient(benchmark::State& state) {
EchoTestService::AsyncService service;
std::unique_ptr<Fixture> fixture(new Fixture(&service));
{
EchoResponse send_response;
EchoResponse recv_response;
if (state.range(0) > 0) {
send_response.set_message(std::string(state.range(0), 'a'));
}
Status recv_status;
ServerContext svr_ctx;
ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
fixture->cq(), tag(0));
std::unique_ptr<EchoTestService::Stub> stub(
EchoTestService::NewStub(fixture->channel()));
ClientContext cli_ctx;
auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
int need_tags = (1 << 0) | (1 << 1);
void* t;
bool ok;
while (need_tags) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
GPR_ASSERT(ok);
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
request_rw->Read(&recv_response, tag(0));
while (state.KeepRunning()) {
response_rw.Write(send_response, tag(1));
while (true) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
if (t == tag(0)) {
request_rw->Read(&recv_response, tag(0));
} else if (t == tag(1)) {
break;
} else {
GPR_ASSERT(false);
}
}
}
response_rw.Finish(Status::OK, tag(1));
need_tags = (1 << 0) | (1 << 1);
while (need_tags) {
GPR_ASSERT(fixture->cq()->Next(&t, &ok));
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
}
fixture->Finish(state);
fixture.reset();
state.SetBytesProcessed(state.range(0) * state.iterations());
}
/*******************************************************************************
* CONFIGURATIONS
*/
@ -494,6 +625,23 @@ BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator,
Server_AddInitialMetadata<RandomAsciiMetadata<10>, 100>)
->Args({0, 0});
BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, TCP)
->Range(0, 128 * 1024 * 1024);
BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, UDS)
->Range(0, 128 * 1024 * 1024);
BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, SockPair)
->Range(0, 128 * 1024 * 1024);
BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, InProcessCHTTP2)
->Range(0, 128 * 1024 * 1024);
BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, TCP)
->Range(0, 128 * 1024 * 1024);
BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, UDS)
->Range(0, 128 * 1024 * 1024);
BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair)
->Range(0, 128 * 1024 * 1024);
BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2)
->Range(0, 128 * 1024 * 1024);
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save