Keep code that is common between both async client versions in a new async superclass and only use unary/streaming-specific material in the subclass.

pull/1380/head
Vijay Pai 10 years ago
parent 6d42a73bb9
commit e10ebf15eb
  1. 133
      test/cpp/qps/client_async.cc

@ -130,39 +130,26 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
response_reader_; response_reader_;
}; };
class AsyncUnaryClient GRPC_FINAL : public Client { class AsyncClient : public Client {
public: public:
explicit AsyncUnaryClient(const ClientConfig& config) : Client(config) { explicit AsyncClient(const ClientConfig& config,
void (*setup_ctx)(CompletionQueue*, TestService::Stub*,
const SimpleRequest&)) :
Client(config) {
for (int i = 0; i < config.async_client_threads(); i++) { for (int i = 0; i < config.async_client_threads(); i++) {
cli_cqs_.emplace_back(new CompletionQueue); cli_cqs_.emplace_back(new CompletionQueue);
} }
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
int t = 0; int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto channel = channels_.begin(); channel != channels_.end(); for (auto channel = channels_.begin(); channel != channels_.end();
channel++) { channel++) {
auto* cq = cli_cqs_[t].get(); auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size(); t = (t + 1) % cli_cqs_.size();
auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, (*setup_ctx)(cq, channel->get_stub(), request_);
const SimpleRequest& request, void* tag) {
return stub->AsyncUnaryCall(ctx, request, cq, tag);
};
TestService::Stub* stub = channel->get_stub();
const SimpleRequest& request = request_;
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
stub, request, start_req, check_done);
} }
} }
StartThreads(config.async_client_threads());
} }
virtual ~AsyncClient() {
~AsyncUnaryClient() GRPC_OVERRIDE {
EndThreads();
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
(*cq)->Shutdown(); (*cq)->Shutdown();
void* got_tag; void* got_tag;
@ -173,10 +160,13 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
} }
} }
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { bool ThreadFunc(Histogram* histogram, size_t thread_idx)
GRPC_OVERRIDE GRPC_FINAL {
void* got_tag; void* got_tag;
bool ok; bool ok;
switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::seconds(1))) { switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok,
std::chrono::system_clock::now() +
std::chrono::seconds(1))) {
case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::SHUTDOWN: return false;
case CompletionQueue::TIMEOUT: return true; case CompletionQueue::TIMEOUT: return true;
case CompletionQueue::GOT_EVENT: break; case CompletionQueue::GOT_EVENT: break;
@ -192,10 +182,30 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
return true; return true;
} }
private:
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
}; };
class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
public:
explicit AsyncUnaryClient(const ClientConfig& config) :
AsyncClient(config, SetupCtx) {
StartThreads(config.async_client_threads());
}
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
private:
static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, void* tag) {
return stub->AsyncUnaryCall(ctx, request, cq, tag);
};
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
stub, req, start_req, check_done);
}
};
template <class RequestType, class ResponseType> template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext { class ClientRpcContextStreamingImpl : public ClientRpcContext {
public: public:
@ -241,7 +251,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return(false); return(false);
} }
next_state_ = &ClientRpcContextStreamingImpl::ReadDone; next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
stream_->Read(&response_, ClientRpcContext::tag(this)); stream_->Read(&response_, ClientRpcContext::tag(this));
return true; return true;
} }
bool ReadDone(bool ok, Histogram *hist) { bool ReadDone(bool ok, Histogram *hist) {
@ -263,71 +273,26 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
stream_; stream_;
}; };
class AsyncStreamingClient GRPC_FINAL : public Client { class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public: public:
explicit AsyncStreamingClient(const ClientConfig &config) : Client(config) { explicit AsyncStreamingClient(const ClientConfig &config) :
for (int i = 0; i < config.async_client_threads(); i++) { AsyncClient(config, SetupCtx) {
cli_cqs_.emplace_back(new CompletionQueue);
}
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto channel = channels_.begin(); channel != channels_.end();
channel++) {
auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
void *tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
TestService::Stub *stub = channel->get_stub();
const SimpleRequest &request = request_;
new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
stub, request, start_req, check_done);
}
}
StartThreads(config.async_client_threads()); StartThreads(config.async_client_threads());
} }
~AsyncStreamingClient() GRPC_OVERRIDE { ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
EndThreads(); private:
static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { const SimpleRequest& req) {
(*cq)->Shutdown(); auto check_done = [](grpc::Status s, SimpleResponse* response) {};
void *got_tag; auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
bool ok; void *tag) {
while ((*cq)->Next(&got_tag, &ok)) { auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
delete ClientRpcContext::detag(got_tag); return stream;
} };
} new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
} stub, req, start_req, check_done);
bool ThreadFunc(Histogram *histogram, size_t thread_idx) GRPC_OVERRIDE {
void *got_tag;
bool ok;
switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, std::chrono::system_clock::now() + std::chrono::seconds(1))) {
case CompletionQueue::SHUTDOWN: return false;
case CompletionQueue::TIMEOUT: return true;
case CompletionQueue::GOT_EVENT: break;
}
ClientRpcContext *ctx = ClientRpcContext::detag(got_tag);
if (ctx->RunNextState(ok, histogram) == false) {
// call the callback and then delete it
ctx->RunNextState(ok, histogram);
ctx->StartNewClone();
delete ctx;
}
return true;
} }
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
}; };
std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) { std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {

Loading…
Cancel
Save