diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 590d56d8d00..1ed3c7157fc 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -47,7 +47,6 @@ #include #include #include -#include "test/core/util/grpc_profiler.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/qps/qpstest.pb.h" #include "test/cpp/qps/timer.h" diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index e4ee45a72d0..77da1725ffa 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -53,7 +53,6 @@ #include #include #include -#include "test/core/util/grpc_profiler.h" #include "test/cpp/util/create_test_channel.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/qpstest.pb.h" diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc deleted file mode 100644 index 258c5e145b8..00000000000 --- a/test/cpp/qps/server.cc +++ /dev/null @@ -1,172 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include -#include -#include -#include - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "src/cpp/server/thread_pool.h" -#include "test/core/util/grpc_profiler.h" -#include "test/cpp/qps/qpstest.pb.h" - -#include -#include - -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(port, 0, "Server port."); -DEFINE_int32(server_threads, 4, "Number of server threads."); - -using grpc::Server; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::ThreadPool; -using grpc::testing::Payload; -using grpc::testing::PayloadType; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; -using grpc::Status; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google {} -namespace gflags {} -using namespace google; -using namespace gflags; - -static bool got_sigint = false; - -static void sigint_handler(int x) { got_sigint = 1; } - -static double time_double(struct timeval* tv) { - return tv->tv_sec + 1e-6 * tv->tv_usec; -} - -static bool SetPayload(PayloadType type, int size, Payload* payload) { - PayloadType response_type = type; - // TODO(yangg): Support UNCOMPRESSABLE payload. - if (type != PayloadType::COMPRESSABLE) { - return false; - } - payload->set_type(response_type); - std::unique_ptr body(new char[size]()); - payload->set_body(body.get(), size); - return true; -} - -namespace { - -class TestServiceImpl GRPC_FINAL : public TestService::Service { - public: - Status CollectServerStats(ServerContext* context, const StatsRequest*, - ServerStats* response) { - struct rusage usage; - struct timeval tv; - gettimeofday(&tv, NULL); - getrusage(RUSAGE_SELF, &usage); - response->set_time_now(time_double(&tv)); - response->set_time_user(time_double(&usage.ru_utime)); - response->set_time_system(time_double(&usage.ru_stime)); - return Status::OK; - } - Status UnaryCall(ServerContext* context, const SimpleRequest* request, - SimpleResponse* response) { - if (request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); - } - } - return Status::OK; - } -}; - -} // namespace - -static void RunServer() { - char* server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_port); - - TestServiceImpl service; - - SimpleRequest request; - SimpleResponse response; - - ServerBuilder builder; - builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); - builder.RegisterService(&service); - - std::unique_ptr pool(new ThreadPool(FLAGS_server_threads)); - builder.SetThreadPool(pool.get()); - - std::unique_ptr server(builder.BuildAndStart()); - gpr_log(GPR_INFO, "Server listening on %s\n", server_address); - - grpc_profiler_start("qps_server.prof"); - - while (!got_sigint) { - sleep(5); - } - - grpc_profiler_stop(); - - gpr_free(server_address); -} - -int main(int argc, char** argv) { - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - - signal(SIGINT, sigint_handler); - - GPR_ASSERT(FLAGS_port != 0); - GPR_ASSERT(!FLAGS_enable_ssl); - RunServer(); - - grpc_shutdown(); - return 0; -} diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 83bb08cd498..65c170af811 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -52,7 +52,6 @@ #include #include #include "src/cpp/server/thread_pool.h" -#include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/qpstest.pb.h" #include "test/cpp/qps/server.h" diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index 6724b8f79af..99644299010 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -47,7 +47,6 @@ #include #include #include "src/cpp/server/thread_pool.h" -#include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/qpstest.pb.h" #include "test/cpp/qps/server.h" #include "test/cpp/qps/timer.h" diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index dddc4c98507..378151c5711 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -109,6 +109,60 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { return Status(RESOURCE_EXHAUSTED); } + grpc_profiler_start("qps_client.prof"); + Status ret = RunTestBody(ctx,stream); + grpc_profiler_stop(); + return ret; + } + + Status RunServer(ServerContext* ctx, + ServerReaderWriter* stream) + GRPC_OVERRIDE { + InstanceGuard g(this); + if (!g.Acquired()) { + return Status(RESOURCE_EXHAUSTED); + } + + grpc_profiler_start("qps_server.prof"); + Status ret = RunServerBody(ctx,stream); + grpc_profiler_stop(); + return ret; + } + + private: + // Protect against multiple clients using this worker at once. + class InstanceGuard { + public: + InstanceGuard(WorkerImpl* impl) + : impl_(impl), acquired_(impl->TryAcquireInstance()) {} + ~InstanceGuard() { + if (acquired_) { + impl_->ReleaseInstance(); + } + } + + bool Acquired() const { return acquired_; } + + private: + WorkerImpl* const impl_; + const bool acquired_; + }; + + bool TryAcquireInstance() { + std::lock_guard g(mu_); + if (acquired_) return false; + acquired_ = true; + return true; + } + + void ReleaseInstance() { + std::lock_guard g(mu_); + GPR_ASSERT(acquired_); + acquired_ = false; + } + + Status RunTestBody(ServerContext* ctx, + ServerReaderWriter* stream) { ClientArgs args; if (!stream->Read(&args)) { return Status(INVALID_ARGUMENT); @@ -135,14 +189,8 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { return Status::OK; } - Status RunServer(ServerContext* ctx, - ServerReaderWriter* stream) - GRPC_OVERRIDE { - InstanceGuard g(this); - if (!g.Acquired()) { - return Status(RESOURCE_EXHAUSTED); - } - + Status RunServerBody(ServerContext* ctx, + ServerReaderWriter* stream) { ServerArgs args; if (!stream->Read(&args)) { return Status(INVALID_ARGUMENT); @@ -170,38 +218,6 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { return Status::OK; } - private: - // Protect against multiple clients using this worker at once. - class InstanceGuard { - public: - InstanceGuard(WorkerImpl* impl) - : impl_(impl), acquired_(impl->TryAcquireInstance()) {} - ~InstanceGuard() { - if (acquired_) { - impl_->ReleaseInstance(); - } - } - - bool Acquired() const { return acquired_; } - - private: - WorkerImpl* const impl_; - const bool acquired_; - }; - - bool TryAcquireInstance() { - std::lock_guard g(mu_); - if (acquired_) return false; - acquired_ = true; - return true; - } - - void ReleaseInstance() { - std::lock_guard g(mu_); - GPR_ASSERT(acquired_); - acquired_ = false; - } - std::mutex mu_; bool acquired_; };