pull/4670/head
Vijay Pai 9 years ago
parent 994f8f76e7
commit e488668075
  1. 121
      test/cpp/qps/client.h
  2. 23
      test/cpp/qps/client_async.cc
  3. 7
      test/cpp/qps/client_sync.cc

@ -37,6 +37,9 @@
#include <condition_variable> #include <condition_variable>
#include <mutex> #include <mutex>
#include <grpc++/support/byte_buffer.h>
#include <grpc++/support/slice.h>
#include "test/cpp/qps/histogram.h" #include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h" #include "test/cpp/qps/timer.h"
@ -74,7 +77,7 @@ void CreateRequest(RequestType *req, const PayloadConfig&) {
// check since these only happen at the beginning anyway // check since these only happen at the beginning anyway
GPR_ASSERT(false); GPR_ASSERT(false);
} }
template <> template <>
void CreateRequest<SimpleRequest>(SimpleRequest *req, const PayloadConfig& payload_config) { void CreateRequest<SimpleRequest>(SimpleRequest *req, const PayloadConfig& payload_config) {
if (payload_config.has_bytebuf_params()) { if (payload_config.has_bytebuf_params()) {
@ -96,11 +99,15 @@ void CreateRequest<SimpleRequest>(SimpleRequest *req, const PayloadConfig& paylo
} }
} }
template <> template <>
void CreateRequest<ByteBuffer>(ByteBuffer *req, const PayloadConfig& payload_config) { void CreateRequest<ByteBuffer>(ByteBuffer *req,
const PayloadConfig& payload_config) {
if (payload_config.has_bytebuf_params()) { if (payload_config.has_bytebuf_params()) {
if (payload_config.req_size() > 0) { if (payload_config.bytebuf_params().req_size() > 0) {
std::unique_ptr<char> buf(new char[payload_config.req_size()]); std::unique_ptr<char>
gpr_slice_from_copied_buffer(buf.get(), payload_config.req_size()); buf(new char[payload_config.bytebuf_params().req_size()]);
gpr_slice s =
gpr_slice_from_copied_buffer(buf.get(),
payload_config.bytebuf_params().req_size());
Slice slice(s, Slice::STEAL_REF); Slice slice(s, Slice::STEAL_REF);
std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1)); std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1));
req->MoveFrom(bbuf.get()); req->MoveFrom(bbuf.get());
@ -110,24 +117,11 @@ void CreateRequest<ByteBuffer>(ByteBuffer *req, const PayloadConfig& payload_con
} }
} }
} }
template <class StubType, class RequestType>
class Client { class Client {
public: public:
Client(const ClientConfig& config, Client() : timer_(new Timer), interarrival_timer_() {}
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) virtual ~Client();
: channels_(config.client_channels()),
create_stub_(create_stub),
timer_(new Timer),
interarrival_timer_() {
for (int i = 0; i < config.client_channels(); i++) {
channels_[i].init(config.server_targets(i % config.server_targets_size()),
config);
}
ClientRequestCreation::CreateRequest<RequestType>(&request_, config.payload_config());
}
virtual ~Client() {}
ClientStats Mark(bool reset) { ClientStats Mark(bool reset) {
Histogram latencies; Histogram latencies;
@ -162,40 +156,9 @@ class Client {
stats.set_time_user(timer_result.user); stats.set_time_user(timer_result.user);
return stats; return stats;
} }
protected: protected:
RequestType request_;
bool closed_loop_; bool closed_loop_;
class ClientChannelInfo {
public:
ClientChannelInfo() {}
ClientChannelInfo(const ClientChannelInfo& i) {
// The copy constructor is to satisfy old compilers
// that need it for using std::vector . It is only ever
// used for empty entries
GPR_ASSERT(!i.channel_ && !i.stub_);
}
void init(const grpc::string& target, const ClientConfig& config) {
// We have to use a 2-phase init like this with a default
// constructor followed by an initializer function to make
// old compilers happy with using this in std::vector
channel_ = CreateTestChannel(
target, config.security_params().server_host_override(),
config.has_security_params(),
!config.security_params().use_test_ca());
stub_ = create_stub_(channel_);
}
Channel* get_channel() { return channel_.get(); }
StubType* get_stub() { return stub_.get(); }
private:
std::shared_ptr<Channel> channel_;
std::unique_ptr<StubType> stub_;
};
std::vector<ClientChannelInfo> channels_;
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub_;
void StartThreads(size_t num_threads) { void StartThreads(size_t num_threads) {
for (size_t i = 0; i < num_threads; i++) { for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i)); threads_.emplace_back(new Thread(this, i));
@ -264,7 +227,6 @@ class Client {
return true; return true;
} }
} }
private: private:
class Thread { class Thread {
public: public:
@ -326,8 +288,6 @@ class Client {
} }
} }
BenchmarkService::Stub* stub_;
ClientConfig config_;
std::mutex mu_; std::mutex mu_;
std::condition_variable cv_; std::condition_variable cv_;
bool done_; bool done_;
@ -337,7 +297,7 @@ class Client {
size_t idx_; size_t idx_;
std::thread impl_; std::thread impl_;
}; };
std::vector<std::unique_ptr<Thread>> threads_; std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<Timer> timer_; std::unique_ptr<Timer> timer_;
@ -345,6 +305,55 @@ class Client {
std::vector<grpc_time> next_time_; std::vector<grpc_time> next_time_;
}; };
template <class StubType, class RequestType>
class ClientImpl : public Client {
public:
ClientImpl(const ClientConfig& config,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub)
: channels_(config.client_channels()),
create_stub_(create_stub) {
for (int i = 0; i < config.client_channels(); i++) {
channels_[i].init(config.server_targets(i % config.server_targets_size()),
config);
}
ClientRequestCreation::CreateRequest<RequestType>(&request_, config.payload_config());
}
virtual ~ClientImpl() {}
protected:
RequestType request_;
class ClientChannelInfo {
public:
ClientChannelInfo() {}
ClientChannelInfo(const ClientChannelInfo& i) {
// The copy constructor is to satisfy old compilers
// that need it for using std::vector . It is only ever
// used for empty entries
GPR_ASSERT(!i.channel_ && !i.stub_);
}
void init(const grpc::string& target, const ClientConfig& config) {
// We have to use a 2-phase init like this with a default
// constructor followed by an initializer function to make
// old compilers happy with using this in std::vector
channel_ = CreateTestChannel(
target, config.security_params().server_host_override(),
config.has_security_params(),
!config.security_params().use_test_ca());
stub_ = create_stub_(channel_);
}
Channel* get_channel() { return channel_.get(); }
StubType* get_stub() { return stub_.get(); }
private:
std::shared_ptr<Channel> channel_;
std::unique_ptr<StubType> stub_;
};
std::vector<ClientChannelInfo> channels_;
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub_;
};
std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
std::unique_ptr<Client> CreateSynchronousStreamingClient( std::unique_ptr<Client> CreateSynchronousStreamingClient(
const ClientConfig& args); const ClientConfig& args);

@ -47,6 +47,7 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <grpc++/client_context.h> #include <grpc++/client_context.h>
#include <grpc++/generic/generic_stub.h>
#include "test/cpp/qps/timer.h" #include "test/cpp/qps/timer.h"
#include "test/cpp/qps/client.h" #include "test/cpp/qps/client.h"
@ -148,13 +149,20 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
typedef std::forward_list<ClientRpcContext*> context_list; typedef std::forward_list<ClientRpcContext*> context_list;
template <class StubType, class RequestType> template <class StubType, class RequestType>
class AsyncClient : public Client<StubType, RequestType> { class AsyncClient : public ClientImpl<StubType, RequestType> {
// Specify which protected members we are using since there is no
// member name resolution until the template types are fully resolved
public: public:
using Client::SetupLoadTest;
using Client::NextIssueTime;
using Client::closed_loop_;
using ClientImpl<StubType,RequestType>::channels_;
using ClientImpl<StubType,RequestType>::request_;
AsyncClient( AsyncClient(
const ClientConfig& config, const ClientConfig& config,
std::function<ClientRpcContext*(int, StubType*, const RequestType&)> setup_ctx, std::function<ClientRpcContext*(int, StubType*, const RequestType&)> setup_ctx,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub)
: Client(config, create_stub), : ClientImpl<StubType,RequestType>(config, create_stub),
channel_lock_(new std::mutex[config.client_channels()]), channel_lock_(new std::mutex[config.client_channels()]),
contexts_(config.client_channels()), contexts_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
@ -344,7 +352,8 @@ class AsyncClient : public Client<StubType, RequestType> {
int pref_channel_inc_; int pref_channel_inc_;
}; };
class AsyncUnaryClient GRPC_FINAL : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { class AsyncUnaryClient GRPC_FINAL :
public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
public: public:
explicit AsyncUnaryClient(const ClientConfig& config) explicit AsyncUnaryClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx, BenchmarkService::NewStub) { : AsyncClient(config, SetupCtx, BenchmarkService::NewStub) {
@ -559,10 +568,10 @@ class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient<grpc::GenericS
}; };
static ClientRpcContext* SetupCtx(int channel_id, static ClientRpcContext* SetupCtx(int channel_id,
grpc::GenericStub* stub, grpc::GenericStub* stub,
const SimpleRequest& req) { const ByteBuffer& req) {
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( return new ClientGenericRpcContextStreamingImpl(
channel_id, stub, req, AsyncStreamingClient::StartReq, channel_id, stub, req, GenericAsyncStreamingClient::StartReq,
AsyncStreamingClient::CheckDone); GenericAsyncStreamingClient::CheckDone);
} }
}; };

@ -64,9 +64,12 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
class SynchronousClient : public Client { class SynchronousClient :
public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
public: public:
SynchronousClient(const ClientConfig& config) : Client(config) { SynchronousClient(const ClientConfig& config) :
ClientImpl<BenchmarkService::Stub,
SimpleRequest>(config, BenchmarkService::NewStub) {
num_threads_ = num_threads_ =
config.outstanding_rpcs_per_channel() * config.client_channels(); config.outstanding_rpcs_per_channel() * config.client_channels();
responses_.resize(num_threads_); responses_.resize(num_threads_);

Loading…
Cancel
Save