/* * * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/usage_timer.h" #include "test/cpp/util/create_test_channel.h" namespace grpc { namespace testing { class ClientRpcContext { public: ClientRpcContext() {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, HistogramEntry* entry) = 0; virtual ClientRpcContext* StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast(c); } static ClientRpcContext* detag(void* t) { return reinterpret_cast(t); } virtual void Start(CompletionQueue* cq) = 0; }; template class ClientRpcContextUnaryImpl : public ClientRpcContext { public: ClientRpcContextUnaryImpl( BenchmarkService::Stub* stub, const RequestType& req, std::function next_issue, std::function< std::unique_ptr>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> start_req, std::function on_done) : context_(), stub_(stub), cq_(nullptr), req_(req), response_(), next_state_(State::READY), callback_(on_done), next_issue_(next_issue), start_req_(start_req) {} ~ClientRpcContextUnaryImpl() override {} void Start(CompletionQueue* cq) override { cq_ = cq; if (!next_issue_) { // ready to issue RunNextState(true, nullptr); } else { // wait for the issue time alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); } } bool RunNextState(bool ok, HistogramEntry* entry) override { switch (next_state_) { case State::READY: start_ = UsageTimer::Now(); response_reader_ = start_req_(stub_, &context_, req_, cq_); response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); next_state_ = State::RESP_DONE; return true; case State::RESP_DONE: entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_, entry); next_state_ = State::INVALID; return false; default: GPR_ASSERT(false); return false; } } ClientRpcContext* StartNewClone() override { return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_, callback_); } private: grpc::ClientContext context_; BenchmarkService::Stub* stub_; CompletionQueue* cq_; std::unique_ptr alarm_; RequestType req_; ResponseType response_; enum State { INVALID, READY, RESP_DONE }; State next_state_; std::function callback_; std::function next_issue_; std::function>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> start_req_; grpc::Status status_; double start_; std::unique_ptr> response_reader_; }; typedef std::forward_list context_list; template class AsyncClient : public ClientImpl { // Specify which protected members we are using since there is no // member name resolution until the template types are fully resolved public: using Client::SetupLoadTest; using Client::closed_loop_; using Client::NextIssuer; using ClientImpl::cores_; using ClientImpl::channels_; using ClientImpl::request_; AsyncClient(const ClientConfig& config, std::function next_issue, const RequestType&)> setup_ctx, std::function(std::shared_ptr)> create_stub) : ClientImpl(config, create_stub), num_async_threads_(NumThreads(config)) { SetupLoadTest(config, num_async_threads_); for (int i = 0; i < num_async_threads_; i++) { cli_cqs_.emplace_back(new CompletionQueue); next_issuers_.emplace_back(NextIssuer(i)); shutdown_state_.emplace_back(new PerThreadShutdownState()); } using namespace std::placeholders; int t = 0; for (int ch = 0; ch < config.client_channels(); ch++) { for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { auto* cq = cli_cqs_[t].get(); auto ctx = setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_); ctx->Start(cq); } t = (t + 1) % cli_cqs_.size(); } } virtual ~AsyncClient() { for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { void* got_tag; bool ok; while ((*cq)->Next(&got_tag, &ok)) { delete ClientRpcContext::detag(got_tag); } } } protected: const int num_async_threads_; private: struct PerThreadShutdownState { mutable std::mutex mutex; bool shutdown; PerThreadShutdownState() : shutdown(false) {} }; int NumThreads(const ClientConfig& config) { int num_threads = config.async_client_threads(); if (num_threads <= 0) { // Use dynamic sizing num_threads = cores_; gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); } return num_threads; } void DestroyMultithreading() override final { for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { std::lock_guard lock((*ss)->mutex); (*ss)->shutdown = true; } for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { (*cq)->Shutdown(); } this->EndThreads(); // this needed for resolution } bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final { void* got_tag; bool ok; switch (cli_cqs_[thread_idx]->AsyncNext( &got_tag, &ok, std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { case CompletionQueue::GOT_EVENT: { // Got a regular event, so process it ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down std::lock_guard l(shutdown_state_[thread_idx]->mutex); if (shutdown_state_[thread_idx]->shutdown) { delete ctx; return true; } else if (!ctx->RunNextState(ok, entry)) { // The RPC and callback are done, so clone the ctx // and kickstart the new one auto clone = ctx->StartNewClone(); clone->Start(cli_cqs_[thread_idx].get()); // delete the old version delete ctx; } return true; } case CompletionQueue::TIMEOUT: { std::lock_guard l(shutdown_state_[thread_idx]->mutex); if (shutdown_state_[thread_idx]->shutdown) { return true; } return true; } case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be // done return true; } GPR_UNREACHABLE_CODE(return true); } std::vector> cli_cqs_; std::vector> next_issuers_; std::vector> shutdown_state_; }; static std::unique_ptr BenchmarkStubCreator( std::shared_ptr ch) { return BenchmarkService::NewStub(ch); } class AsyncUnaryClient final : public AsyncClient { public: explicit AsyncUnaryClient(const ClientConfig& config) : AsyncClient( config, SetupCtx, BenchmarkStubCreator) { StartThreads(num_async_threads_); } ~AsyncUnaryClient() override {} private: static void CheckDone(grpc::Status s, SimpleResponse* response, HistogramEntry* entry) { entry->set_status(s.error_code()); } static std::unique_ptr> StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, const SimpleRequest& request, CompletionQueue* cq) { return stub->AsyncUnaryCall(ctx, request, cq); }; static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, std::function next_issue, const SimpleRequest& req) { return new ClientRpcContextUnaryImpl( stub, req, next_issue, AsyncUnaryClient::StartReq, AsyncUnaryClient::CheckDone); } }; template class ClientRpcContextStreamingImpl : public ClientRpcContext { public: ClientRpcContextStreamingImpl( BenchmarkService::Stub* stub, const RequestType& req, std::function next_issue, std::function>( BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> start_req, std::function on_done) : context_(), stub_(stub), cq_(nullptr), req_(req), response_(), next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), start_req_(start_req) {} ~ClientRpcContextStreamingImpl() override {} void Start(CompletionQueue* cq) override { cq_ = cq; stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } bool RunNextState(bool ok, HistogramEntry* entry) override { while (true) { switch (next_state_) { case State::STREAM_IDLE: if (!next_issue_) { // ready to issue next_state_ = State::READY_TO_WRITE; } else { next_state_ = State::WAIT; } break; // loop around, don't return case State::WAIT: alarm_.reset( new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); next_state_ = State::READY_TO_WRITE; return true; case State::READY_TO_WRITE: if (!ok) { return false; } start_ = UsageTimer::Now(); next_state_ = State::WRITE_DONE; stream_->Write(req_, ClientRpcContext::tag(this)); return true; case State::WRITE_DONE: if (!ok) { return false; } next_state_ = State::READ_DONE; stream_->Read(&response_, ClientRpcContext::tag(this)); return true; break; case State::READ_DONE: entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around default: GPR_ASSERT(false); return false; } } } ClientRpcContext* StartNewClone() override { return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_, start_req_, callback_); } private: grpc::ClientContext context_; BenchmarkService::Stub* stub_; CompletionQueue* cq_; std::unique_ptr alarm_; RequestType req_; ResponseType response_; enum State { INVALID, STREAM_IDLE, WAIT, READY_TO_WRITE, WRITE_DONE, READ_DONE }; State next_state_; std::function callback_; std::function next_issue_; std::function>( BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> start_req_; grpc::Status status_; double start_; std::unique_ptr> stream_; }; class AsyncStreamingClient final : public AsyncClient { public: explicit AsyncStreamingClient(const ClientConfig& config) : AsyncClient( config, SetupCtx, BenchmarkStubCreator) { StartThreads(num_async_threads_); } ~AsyncStreamingClient() override {} private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} static std::unique_ptr< grpc::ClientAsyncReaderWriter> StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, CompletionQueue* cq, void* tag) { auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, std::function next_issue, const SimpleRequest& req) { return new ClientRpcContextStreamingImpl( stub, req, next_issue, AsyncStreamingClient::StartReq, AsyncStreamingClient::CheckDone); } }; class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { public: ClientRpcContextGenericStreamingImpl( grpc::GenericStub* stub, const ByteBuffer& req, std::function next_issue, std::function( grpc::GenericStub*, grpc::ClientContext*, const grpc::string& method_name, CompletionQueue*, void*)> start_req, std::function on_done) : context_(), stub_(stub), cq_(nullptr), req_(req), response_(), next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), start_req_(start_req) {} ~ClientRpcContextGenericStreamingImpl() override {} void Start(CompletionQueue* cq) override { cq_ = cq; const grpc::string kMethodName( "/grpc.testing.BenchmarkService/StreamingCall"); stream_ = start_req_(stub_, &context_, kMethodName, cq, ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } bool RunNextState(bool ok, HistogramEntry* entry) override { while (true) { switch (next_state_) { case State::STREAM_IDLE: if (!next_issue_) { // ready to issue next_state_ = State::READY_TO_WRITE; } else { next_state_ = State::WAIT; } break; // loop around, don't return case State::WAIT: alarm_.reset( new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); next_state_ = State::READY_TO_WRITE; return true; case State::READY_TO_WRITE: if (!ok) { return false; } start_ = UsageTimer::Now(); next_state_ = State::WRITE_DONE; stream_->Write(req_, ClientRpcContext::tag(this)); return true; case State::WRITE_DONE: if (!ok) { return false; } next_state_ = State::READ_DONE; stream_->Read(&response_, ClientRpcContext::tag(this)); return true; break; case State::READ_DONE: entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around default: GPR_ASSERT(false); return false; } } } ClientRpcContext* StartNewClone() override { return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_, start_req_, callback_); } private: grpc::ClientContext context_; grpc::GenericStub* stub_; CompletionQueue* cq_; std::unique_ptr alarm_; ByteBuffer req_; ByteBuffer response_; enum State { INVALID, STREAM_IDLE, WAIT, READY_TO_WRITE, WRITE_DONE, READ_DONE }; State next_state_; std::function callback_; std::function next_issue_; std::function( grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, CompletionQueue*, void*)> start_req_; grpc::Status status_; double start_; std::unique_ptr stream_; }; static std::unique_ptr GenericStubCreator( std::shared_ptr ch) { return std::unique_ptr(new grpc::GenericStub(ch)); } class GenericAsyncStreamingClient final : public AsyncClient { public: explicit GenericAsyncStreamingClient(const ClientConfig& config) : AsyncClient(config, SetupCtx, GenericStubCreator) { StartThreads(num_async_threads_); } ~GenericAsyncStreamingClient() override {} private: static void CheckDone(grpc::Status s, ByteBuffer* response) {} static std::unique_ptr StartReq( grpc::GenericStub* stub, grpc::ClientContext* ctx, const grpc::string& method_name, CompletionQueue* cq, void* tag) { auto stream = stub->Call(ctx, method_name, cq, tag); return stream; }; static ClientRpcContext* SetupCtx(grpc::GenericStub* stub, std::function next_issue, const ByteBuffer& req) { return new ClientRpcContextGenericStreamingImpl( stub, req, next_issue, GenericAsyncStreamingClient::StartReq, GenericAsyncStreamingClient::CheckDone); } }; std::unique_ptr CreateAsyncUnaryClient(const ClientConfig& args) { return std::unique_ptr(new AsyncUnaryClient(args)); } std::unique_ptr CreateAsyncStreamingClient(const ClientConfig& args) { return std::unique_ptr(new AsyncStreamingClient(args)); } std::unique_ptr CreateGenericAsyncStreamingClient( const ClientConfig& args) { return std::unique_ptr(new GenericAsyncStreamingClient(args)); } } // namespace testing } // namespace grpc