diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index c7f206d4255..c7513ed9b58 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -69,10 +69,10 @@ typedef std::chrono::time_point grpc_time; class Client { public: explicit Client(const ClientConfig& config) - : timer_(new Timer), interarrival_timer_() { + : channels_(config.client_channels()), timer_(new Timer), interarrival_timer_() { for (int i = 0; i < config.client_channels(); i++) { - channels_.emplace_back( - config.server_targets(i % config.server_targets_size()), config); + channels_[i].init(config.server_targets(i % config.server_targets_size()), + config); } request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); request_.set_response_size(config.payload_size()); @@ -81,7 +81,7 @@ class Client { ClientStats Mark() { Histogram latencies; - std::vector to_merge(threads_.size()); + Histogram to_merge[threads_.size()]; // avoid std::vector for old compilers for (size_t i = 0; i < threads_.size(); i++) { threads_[i]->BeginSwap(&to_merge[i]); } @@ -108,12 +108,16 @@ class Client { class ClientChannelInfo { public: - ClientChannelInfo(const grpc::string& target, const ClientConfig& config) - : channel_(CreateTestChannel(target, config.enable_ssl())), - stub_(TestService::NewStub(channel_)) {} + ClientChannelInfo() {} + ClientChannelInfo(const ClientChannelInfo& i): channel_(), stub_() { + GPR_ASSERT(!i.channel_ && !i.stub_); + } + void init(const grpc::string& target, const ClientConfig& config) { + channel_ = CreateTestChannel(target, config.enable_ssl()); + stub_ = TestService::NewStub(channel_); + } ChannelInterface* get_channel() { return channel_.get(); } TestService::Stub* get_stub() { return stub_.get(); } - private: std::shared_ptr channel_; std::unique_ptr stub_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 55f07f78f39..5e26400f063 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -156,7 +156,7 @@ class AsyncClient : public Client { std::function setup_ctx) : Client(config), - channel_lock_(config.client_channels()), + channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), channel_count_(config.client_channels()), @@ -208,6 +208,7 @@ class AsyncClient : public Client { delete ctx; } } + delete[] channel_lock_; } bool ThreadFunc(Histogram* histogram, @@ -336,7 +337,7 @@ class AsyncClient : public Client { std::vector issue_allowed_; // may this thread attempt to issue std::vector next_issue_; // when should it issue? - std::vector channel_lock_; + std::mutex *channel_lock_; // a vector, but avoid std::vector for old compilers std::vector contexts_; // per-channel list of idle contexts int max_outstanding_per_channel_; int channel_count_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 5d05c914e87..11c6daca1de 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -113,9 +113,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { public: SynchronousStreamingClient(const ClientConfig& config) - : SynchronousClient(config), - context_(num_threads_), - stream_(num_threads_) { + : SynchronousClient(config) { + context_ = new grpc::ClientContext[num_threads_]; + stream_ = new std::unique_ptr< + grpc::ClientReaderWriter>[num_threads_]; for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); @@ -124,12 +125,14 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } ~SynchronousStreamingClient() { EndThreads(); - for (auto stream = stream_.begin(); stream != stream_.end(); stream++) { + for (auto stream = &stream_[0]; stream != &stream_[num_threads_]; stream++) { if (*stream) { (*stream)->WritesDone(); EXPECT_TRUE((*stream)->Finish().ok()); } } + delete[] stream_; + delete[] context_; } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { @@ -144,9 +147,11 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } private: - std::vector context_; - std::vector>> stream_; + // These are both conceptually std::vector but cannot be for old compilers + // that expect contained classes to support copy constructors + grpc::ClientContext *context_; + std::unique_ptr< + grpc::ClientReaderWriter>* stream_; }; std::unique_ptr CreateSynchronousUnaryClient( diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 4e4562273a4..9cca4c04a19 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -149,19 +149,18 @@ std::unique_ptr RunScenario( // Start servers using runsc::ServerData; - vector servers; + ServerData servers[num_servers]; for (size_t i = 0; i < num_servers; i++) { - ServerData sd; - sd.stub = std::move(Worker::NewStub( + servers[i].stub = std::move(Worker::NewStub( CreateChannel(workers[i], InsecureCredentials(), ChannelArguments()))); ServerArgs args; result_server_config = server_config; result_server_config.set_host(workers[i]); *args.mutable_setup() = server_config; - sd.stream = std::move(sd.stub->RunServer(runsc::AllocContext(&contexts))); - GPR_ASSERT(sd.stream->Write(args)); + servers[i].stream = std::move(servers[i].stub->RunServer(runsc::AllocContext(&contexts))); + GPR_ASSERT(servers[i].stream->Write(args)); ServerStatus init_status; - GPR_ASSERT(sd.stream->Read(&init_status)); + GPR_ASSERT(servers[i].stream->Read(&init_status)); char* host; char* driver_port; char* cli_target; @@ -171,27 +170,22 @@ std::unique_ptr RunScenario( gpr_free(host); gpr_free(driver_port); gpr_free(cli_target); - - servers.push_back(std::move(sd)); } // Start clients using runsc::ClientData; - vector clients; + ClientData clients[num_clients]; for (size_t i = 0; i < num_clients; i++) { - ClientData cd; - cd.stub = std::move(Worker::NewStub(CreateChannel( + clients[i].stub = std::move(Worker::NewStub(CreateChannel( workers[i + num_servers], InsecureCredentials(), ChannelArguments()))); ClientArgs args; result_client_config = client_config; result_client_config.set_host(workers[i + num_servers]); *args.mutable_setup() = client_config; - cd.stream = std::move(cd.stub->RunTest(runsc::AllocContext(&contexts))); - GPR_ASSERT(cd.stream->Write(args)); + clients[i].stream = std::move(clients[i].stub->RunTest(runsc::AllocContext(&contexts))); + GPR_ASSERT(clients[i].stream->Write(args)); ClientStatus init_status; - GPR_ASSERT(cd.stream->Read(&init_status)); - - clients.push_back(std::move(cd)); + GPR_ASSERT(clients[i].stream->Read(&init_status)); } // Let everything warmup @@ -206,18 +200,18 @@ std::unique_ptr RunScenario( server_mark.mutable_mark(); ClientArgs client_mark; client_mark.mutable_mark(); - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Write(server_mark)); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Write(client_mark)); } ServerStatus server_status; ClientStatus client_status; - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Read(&server_status)); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Read(&client_status)); } @@ -231,19 +225,19 @@ std::unique_ptr RunScenario( result->client_config = result_client_config; result->server_config = result_server_config; gpr_log(GPR_INFO, "Finishing"); - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Write(server_mark)); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Write(client_mark)); } - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Read(&server_status)); const auto& stats = server_status.stats(); result->server_resources.emplace_back( stats.time_elapsed(), stats.time_user(), stats.time_system()); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Read(&client_status)); const auto& stats = client_status.stats(); result->latencies.MergeProto(stats.latencies()); @@ -251,11 +245,11 @@ std::unique_ptr RunScenario( stats.time_elapsed(), stats.time_user(), stats.time_system()); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->WritesDone()); GPR_ASSERT(client->stream->Finish().ok()); } - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->WritesDone()); GPR_ASSERT(server->stream->Finish().ok()); } diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index d5348463650..7c85f81b5e8 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -36,6 +36,7 @@ #include #include +#include #include "test/cpp/qps/driver.h" #include "test/cpp/qps/report.h"