clang-format all these files

changes/30/217530/1
Vijay Pai 10 years ago
parent 56c5129629
commit 64ac47f389
  1. 63
      test/cpp/end2end/async_end2end_test.cc
  2. 230
      test/cpp/qps/client_async.cc
  3. 43
      test/cpp/qps/server_async.cc

@ -65,9 +65,7 @@ namespace testing {
namespace {
void* tag(int i) {
return (void*)(gpr_intptr)i;
}
void* tag(int i) { return (void*)(gpr_intptr)i; }
void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
bool ok;
@ -109,18 +107,10 @@ class AsyncEnd2endTest : public ::testing::Test {
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
}
void server_ok(int i) {
verify_ok(&srv_cq_, i, true);
}
void client_ok(int i) {
verify_ok(&cli_cq_, i , true);
}
void server_fail(int i) {
verify_ok(&srv_cq_, i, false);
}
void client_fail(int i) {
verify_ok(&cli_cq_, i, false);
}
void server_ok(int i) { verify_ok(&srv_cq_, i, true); }
void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
void server_fail(int i) { verify_ok(&srv_cq_, i, false); }
void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
void SendRpc(int num_rpcs) {
for (int i = 0; i < num_rpcs; i++) {
@ -135,12 +125,11 @@ class AsyncEnd2endTest : public ::testing::Test {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> >
response_reader(stub_->AsyncEcho(
&cli_ctx, send_request, &cli_cq_, tag(1)));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
@ -193,8 +182,7 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1)));
service_.RequestRequestStream(
&srv_ctx, &srv_stream, &srv_cq_, tag(2));
service_.RequestRequestStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2));
server_ok(2);
client_ok(1);
@ -247,8 +235,8 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) {
std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestResponseStream(
&srv_ctx, &recv_request, &srv_stream, &srv_cq_, tag(2));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, &srv_cq_,
tag(2));
server_ok(2);
client_ok(1);
@ -298,8 +286,7 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> >
cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq_, tag(1)));
service_.RequestBidiStream(
&srv_ctx, &srv_stream, &srv_cq_, tag(2));
service_.RequestBidiStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2));
server_ok(2);
client_ok(1);
@ -357,8 +344,8 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@ -399,8 +386,8 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
@ -447,8 +434,8 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
@ -462,7 +449,6 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
server_ok(4);
response_reader->Finish(&recv_response, &recv_status, tag(5));
client_ok(5);
EXPECT_EQ(send_response.message(), recv_response.message());
@ -491,10 +477,12 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
std::pair<grpc::string, grpc::string> meta2(
"key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13});
std::pair<grpc::string, grpc::string> meta3("key3", "val3");
std::pair<grpc::string, grpc::string> meta6("key4-bin",
std::pair<grpc::string, grpc::string> meta6(
"key4-bin",
{"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14});
std::pair<grpc::string, grpc::string> meta5("key5", "val5");
std::pair<grpc::string, grpc::string> meta4("key6-bin",
std::pair<grpc::string, grpc::string> meta4(
"key6-bin",
{"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15});
cli_ctx.AddMetadata(meta1.first, meta1.second);
@ -503,8 +491,8 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@ -531,7 +519,6 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
server_ok(5);
response_reader->Finish(&recv_response, &recv_status, tag(6));
client_ok(6);
EXPECT_EQ(send_response.message(), recv_response.message());

@ -84,8 +84,8 @@ using grpc::testing::TestService;
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
namespace google { }
namespace gflags { }
namespace google {}
namespace gflags {}
using namespace google;
using namespace gflags;
@ -94,63 +94,67 @@ static double now() {
return 1e9 * tv.tv_sec + tv.tv_nsec;
}
class ClientRpcContext {
public:
ClientRpcContext() {}
virtual ~ClientRpcContext() {}
virtual bool operator()() = 0; // do next state, return false if steps done
static void *tag(ClientRpcContext *c) {return reinterpret_cast<void *>(c);}
static ClientRpcContext *detag(void *t) {
return reinterpret_cast<ClientRpcContext *>(t);
}
virtual void report_stats(gpr_histogram *hist) = 0;
};
template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
public:
ClientRpcContextUnaryImpl(const RequestType& req,
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<
ResponseType>>(grpc::ClientContext *,
const RequestType&, void *)> start_req,
std::function<void(grpc::Status, ResponseType *)> on_done):
context_(), req_(req), response_(),
next_state_(&ClientRpcContextUnaryImpl::ReqSent),
callback_(on_done),
start_(now()),
response_reader_(start_req(&context_, req_,
ClientRpcContext::tag(this))) {
}
~ClientRpcContextUnaryImpl() override {}
bool operator()() override {return (this->*next_state_)();}
void report_stats(gpr_histogram *hist) override {
gpr_histogram_add(hist, now()-start_);
}
private:
bool ReqSent() {
next_state_ = &ClientRpcContextUnaryImpl::RespDone;
response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
return true;
}
bool RespDone() {
next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
return false;
}
bool DoCallBack() {
callback_(status_, &response_);
return false;
}
grpc::ClientContext context_;
RequestType req_;
ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)();
std::function<void(grpc::Status, ResponseType *)> callback_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> response_reader_;
};
class ClientRpcContext {
public:
ClientRpcContext() {}
virtual ~ClientRpcContext() {}
virtual bool operator()() = 0; // do next state, return false if steps done
static void *tag(ClientRpcContext *c) { return reinterpret_cast<void *>(c); }
static ClientRpcContext *detag(void *t) {
return reinterpret_cast<ClientRpcContext *>(t);
}
virtual void report_stats(gpr_histogram *hist) = 0;
};
template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
public:
ClientRpcContextUnaryImpl(
const RequestType &req,
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
grpc::ClientContext *, const RequestType &, void *)> start_req,
std::function<void(grpc::Status, ResponseType *)> on_done)
: context_(),
req_(req),
response_(),
next_state_(&ClientRpcContextUnaryImpl::ReqSent),
callback_(on_done),
start_(now()),
response_reader_(
start_req(&context_, req_, ClientRpcContext::tag(this))) {}
~ClientRpcContextUnaryImpl() override {}
bool operator()() override { return (this->*next_state_)(); }
void report_stats(gpr_histogram *hist) override {
gpr_histogram_add(hist, now() - start_);
}
private:
bool ReqSent() {
next_state_ = &ClientRpcContextUnaryImpl::RespDone;
response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
return true;
}
bool RespDone() {
next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
return false;
}
bool DoCallBack() {
callback_(status_, &response_);
return false;
}
grpc::ClientContext context_;
RequestType req_;
ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)();
std::function<void(grpc::Status, ResponseType *)> callback_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
response_reader_;
};
static void RunTest(const int client_threads, const int client_channels,
const int num_rpcs, const int payload_size) {
const int num_rpcs, const int payload_size) {
gpr_log(GPR_INFO,
"QPS test with parameters\n"
"enable_ssl = %d\n"
@ -197,71 +201,65 @@ static void RunTest(const int client_threads, const int client_channels,
grpc_profiler_start("qps_client_async.prof");
auto CheckDone = [=](grpc::Status s, SimpleResponse *response) {
GPR_ASSERT(s.IsOk() &&
(response->payload().type() ==
grpc::testing::PayloadType::COMPRESSABLE) &&
(response->payload().body().length() ==
static_cast<size_t>(payload_size)));
GPR_ASSERT(s.IsOk() && (response->payload().type() ==
grpc::testing::PayloadType::COMPRESSABLE) &&
(response->payload().body().length() ==
static_cast<size_t>(payload_size)));
};
for (int i = 0; i < client_threads; i++) {
gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
GPR_ASSERT(hist != NULL);
thread_stats[i] = hist;
threads.push_back(
std::thread([hist, client_threads, client_channels, num_rpcs,
payload_size, &channels, &CheckDone](int channel_num) {
using namespace std::placeholders;
SimpleRequest request;
request.set_response_type(
grpc::testing::PayloadType::COMPRESSABLE);
request.set_response_size(payload_size);
grpc::CompletionQueue cli_cq;
int rpcs_sent=0;
while (rpcs_sent < num_rpcs) {
rpcs_sent++;
TestService::Stub *stub =
channels[channel_num].get_stub();
grpc::ClientContext context;
auto start_req = std::bind(&TestService::Stub::AsyncUnaryCall,
stub, _1, _2, &cli_cq, _3);
new ClientRpcContextUnaryImpl<SimpleRequest,
SimpleResponse>(request,
start_req,
CheckDone);
void *got_tag;
bool ok;
// Need to call 2 next for every 1 RPC (1 for req done, 1 for resp done)
cli_cq.Next(&got_tag,&ok);
if (!ok)
break;
ClientRpcContext *ctx = ClientRpcContext::detag(got_tag);
if ((*ctx)() == false) {
// call the callback and then delete it
(*ctx)();
delete ctx;
}
cli_cq.Next(&got_tag,&ok);
if (!ok)
break;
ctx = ClientRpcContext::detag(got_tag);
if ((*ctx)() == false) {
// call the callback and then delete it
ctx->report_stats(hist);
(*ctx)();
delete ctx;
}
// Now do runtime round-robin assignment of the next
// channel number
channel_num += client_threads;
channel_num %= client_channels;
}
},
i % client_channels));
threads.push_back(std::thread(
[hist, client_threads, client_channels, num_rpcs, payload_size,
&channels, &CheckDone](int channel_num) {
using namespace std::placeholders;
SimpleRequest request;
request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request.set_response_size(payload_size);
grpc::CompletionQueue cli_cq;
int rpcs_sent = 0;
while (rpcs_sent < num_rpcs) {
rpcs_sent++;
TestService::Stub *stub = channels[channel_num].get_stub();
grpc::ClientContext context;
auto start_req = std::bind(&TestService::Stub::AsyncUnaryCall, stub,
_1, _2, &cli_cq, _3);
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request, start_req, CheckDone);
void *got_tag;
bool ok;
// Need to call 2 next for every 1 RPC (1 for req done, 1 for resp
// done)
cli_cq.Next(&got_tag, &ok);
if (!ok) break;
ClientRpcContext *ctx = ClientRpcContext::detag(got_tag);
if ((*ctx)() == false) {
// call the callback and then delete it
(*ctx)();
delete ctx;
}
cli_cq.Next(&got_tag, &ok);
if (!ok) break;
ctx = ClientRpcContext::detag(got_tag);
if ((*ctx)() == false) {
// call the callback and then delete it
ctx->report_stats(hist);
(*ctx)();
delete ctx;
}
// Now do runtime round-robin assignment of the next
// channel number
channel_num += client_threads;
channel_num %= client_channels;
}
},
i % client_channels));
}
gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);

@ -124,10 +124,12 @@ class AsyncQpsServerTest {
std::bind(&TestService::AsyncService::RequestCollectServerStats,
&async_service_, _1, _2, _3, &srv_cq_, _4);
for (int i = 0; i < 100; i++) {
contexts_.push_front(new ServerRpcContextUnaryImpl<SimpleRequest,
SimpleResponse>(request_unary_, UnaryCall));
contexts_.push_front(new ServerRpcContextUnaryImpl<StatsRequest,
ServerStats>(request_stats_, CollectServerStats));
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary_, UnaryCall));
contexts_.push_front(
new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>(
request_stats_, CollectServerStats));
}
}
~AsyncQpsServerTest() {
@ -151,12 +153,12 @@ class AsyncQpsServerTest {
void *got_tag;
while (srv_cq_.Next(&got_tag, &ok)) {
EXPECT_EQ(ok, true);
ServerRpcContext *ctx = detag(got_tag);
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
if ((*ctx)() == false) {
// this RPC context is done, so refresh it
ctx->refresh();
}
// this RPC context is done, so refresh it
ctx->refresh();
}
}
return;
}));
@ -165,13 +167,14 @@ class AsyncQpsServerTest {
std::this_thread::sleep_for(std::chrono::seconds(5));
}
}
private:
class ServerRpcContext {
public:
public:
ServerRpcContext() {}
virtual ~ServerRpcContext() {};
virtual bool operator()() = 0; // do next state, return false if all done
virtual void refresh() = 0; // start this back at a clean state
virtual ~ServerRpcContext(){};
virtual bool operator()() = 0; // do next state, return false if all done
virtual void refresh() = 0; // start this back at a clean state
};
static void *tag(ServerRpcContext *func) {
return reinterpret_cast<void *>(func);
@ -192,25 +195,26 @@ class AsyncQpsServerTest {
: next_state_(&ServerRpcContextUnaryImpl::invoker),
request_method_(request_method),
invoke_method_(invoke_method),
response_writer_(&srv_ctx_) {
response_writer_(&srv_ctx_) {
request_method_(&srv_ctx_, &req_, &response_writer_,
AsyncQpsServerTest::tag(this));
AsyncQpsServerTest::tag(this));
}
~ServerRpcContextUnaryImpl() override {}
bool operator()() override {return (this->*next_state_)();}
bool operator()() override { return (this->*next_state_)(); }
void refresh() override {
srv_ctx_ = ServerContext();
req_ = RequestType();
response_writer_ =
grpc::ServerAsyncResponseWriter<ResponseType>(&srv_ctx_);
grpc::ServerAsyncResponseWriter<ResponseType>(&srv_ctx_);
// Then request the method
next_state_ = &ServerRpcContextUnaryImpl::invoker;
request_method_(&srv_ctx_, &req_, &response_writer_,
AsyncQpsServerTest::tag(this));
AsyncQpsServerTest::tag(this));
}
private:
bool finisher() {return false;}
bool finisher() { return false; }
bool invoker() {
ResponseType response;
@ -219,8 +223,7 @@ class AsyncQpsServerTest {
// Have the response writer work and invoke on_finish when done
next_state_ = &ServerRpcContextUnaryImpl::finisher;
response_writer_.Finish(response, status,
AsyncQpsServerTest::tag(this));
response_writer_.Finish(response, status, AsyncQpsServerTest::tag(this));
return true;
}
ServerContext srv_ctx_;

Loading…
Cancel
Save