Merge pull request #14164 from yang-g/cork_all

Add option to use client side coalescing API in qps test
pull/14169/head
Yang Gao 7 years ago committed by GitHub
commit de79daaba5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/proto/grpc/testing/control.proto
  2. 38
      test/cpp/qps/client_async.cc
  3. 1
      test/cpp/qps/client_sync.cc

@ -108,6 +108,9 @@ message ClientConfig {
// Number of messages on a stream before it gets finished/restarted
int32 messages_per_stream = 18;
// Use coalescing API when possible.
bool use_coalesce_api = 19;
}
message ClientStatus { ClientStats stats = 1; }

@ -82,6 +82,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
prepare_req_(prepare_req) {}
~ClientRpcContextUnaryImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
GPR_ASSERT(!config.use_coalesce_api()); // not supported.
StartInternal(cq);
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
@ -349,10 +350,11 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
next_state_(State::INVALID),
callback_(on_done),
next_issue_(next_issue),
prepare_req_(prepare_req) {}
prepare_req_(prepare_req),
coalesce_(false) {}
~ClientRpcContextStreamingPingPongImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq, config.messages_per_stream());
StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api());
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
while (true) {
@ -375,7 +377,12 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
}
start_ = UsageTimer::Now();
next_state_ = State::WRITE_DONE;
stream_->Write(req_, ClientRpcContext::tag(this));
if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) {
stream_->WriteLast(req_, WriteOptions(),
ClientRpcContext::tag(this));
} else {
stream_->Write(req_, ClientRpcContext::tag(this));
}
return true;
case State::WRITE_DONE:
if (!ok) {
@ -391,6 +398,11 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
if ((messages_per_stream_ != 0) &&
(++messages_issued_ >= messages_per_stream_)) {
next_state_ = State::WRITES_DONE_DONE;
if (coalesce_) {
// WritesDone should have been called on the last Write.
// loop around to call Finish.
break;
}
stream_->WritesDone(ClientRpcContext::tag(this));
return true;
}
@ -413,7 +425,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingPingPongImpl(
stub_, req_, next_issue_, prepare_req_, callback_);
clone->StartInternal(cq, messages_per_stream_);
clone->StartInternal(cq, messages_per_stream_, coalesce_);
}
void TryCancel() override { context_.TryCancel(); }
@ -449,14 +461,27 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
// Allow a limit on number of messages in a stream
int messages_per_stream_;
int messages_issued_;
// Whether to use coalescing API.
bool coalesce_;
void StartInternal(CompletionQueue* cq, int messages_per_stream) {
void StartInternal(CompletionQueue* cq, int messages_per_stream,
bool coalesce) {
cq_ = cq;
messages_per_stream_ = messages_per_stream;
messages_issued_ = 0;
coalesce_ = coalesce;
if (coalesce_) {
GPR_ASSERT(messages_per_stream_ != 0);
context_.set_initial_metadata_corked(true);
}
stream_ = prepare_req_(stub_, &context_, cq);
next_state_ = State::STREAM_IDLE;
stream_->StartCall(ClientRpcContext::tag(this));
if (coalesce_) {
// When the intial metadata is corked, the tag will not come back and we
// need to manually drive the state machine.
RunNextState(true, nullptr);
}
}
};
@ -512,6 +537,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromClientImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
StartInternal(cq);
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
@ -641,6 +667,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromServerImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
GPR_ASSERT(!config.use_coalesce_api()); // not supported
StartInternal(cq);
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
@ -753,6 +780,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
prepare_req_(prepare_req) {}
~ClientRpcContextGenericStreamingImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
StartInternal(cq, config.messages_per_stream());
}
bool RunNextState(bool ok, HistogramEntry* entry) override {

@ -402,6 +402,7 @@ class SynchronousStreamingBothWaysClient final
};
std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
switch (config.rpc_type()) {
case UNARY:
return std::unique_ptr<Client>(new SynchronousUnaryClient(config));

Loading…
Cancel
Save