From b6df94ad220b9e48d306150184848db614cc3506 Mon Sep 17 00:00:00 2001 From: vjpai Date: Mon, 30 Nov 2015 15:52:50 -0800 Subject: [PATCH 01/12] WIP --- Makefile | 2 +- include/grpc++/support/byte_buffer.h | 3 + src/cpp/util/byte_buffer.cc | 6 ++ test/cpp/qps/client.h | 83 ++++++++++++------ test/cpp/qps/client_async.cc | 120 +++++++++++++++++++++++++-- test/cpp/qps/server.h | 3 +- 6 files changed, 179 insertions(+), 38 deletions(-) diff --git a/Makefile b/Makefile index 9d94ee85992..b48974379b7 100644 --- a/Makefile +++ b/Makefile @@ -3077,7 +3077,7 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/h2_full+poll_ping_pong_streaming_nosec_test || ( echo test h2_full+poll_ping_pong_streaming_nosec_test failed ; exit 1 ) $(E) "[RUN] Testing h2_full+poll_registered_call_nosec_test" $(Q) $(BINDIR)/$(CONFIG)/h2_full+poll_registered_call_nosec_test || ( echo test h2_full+poll_registered_call_nosec_test failed ; exit 1 ) - $(E) "[RUN] Testing h2_full+poll_request_with_flags_nosec_test" + $(E) "[RUN] Testing h2_full+poll_riequest_with_flags_nosec_test" $(Q) $(BINDIR)/$(CONFIG)/h2_full+poll_request_with_flags_nosec_test || ( echo test h2_full+poll_request_with_flags_nosec_test failed ; exit 1 ) $(E) "[RUN] Testing h2_full+poll_request_with_payload_nosec_test" $(Q) $(BINDIR)/$(CONFIG)/h2_full+poll_request_with_payload_nosec_test || ( echo test h2_full+poll_request_with_payload_nosec_test failed ; exit 1 ) diff --git a/include/grpc++/support/byte_buffer.h b/include/grpc++/support/byte_buffer.h index 9d19b077089..6f29bdfcd55 100644 --- a/include/grpc++/support/byte_buffer.h +++ b/include/grpc++/support/byte_buffer.h @@ -66,6 +66,9 @@ class ByteBuffer GRPC_FINAL { /// Buffer size in bytes. size_t Length() const; + /// Move contents from \a bbuf and clear \a bbuf + void MoveFrom(ByteBuffer* bbuf); + private: friend class SerializationTraits; diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc index 755234d7e8b..4d73599542f 100644 --- a/src/cpp/util/byte_buffer.cc +++ b/src/cpp/util/byte_buffer.cc @@ -79,4 +79,10 @@ size_t ByteBuffer::Length() const { } } +void ByteBuffer::MoveFrom(ByteBuffer* bbuf) { + Clear(); // in case we already had something, but we shouldn't use this then + buffer_ = bbuf->buffer_; + bbuf->buffer_ = nullptr; +} + } // namespace grpc diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index f4400692fea..30a8030f513 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -66,36 +66,66 @@ namespace testing { typedef std::chrono::high_resolution_clock grpc_time_source; typedef std::chrono::time_point grpc_time; +namespace ClientRequestCreation { +template +void CreateRequest(RequestType *req, const PayloadConfig&) { + // this template must be specialized + // fail with an assertion rather than a compile-time + // check since these only happen at the beginning anyway + GPR_ASSERT(false); +} + +template <> +void CreateRequest(SimpleRequest *req, const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + GPR_ASSERT(false); // not appropriate for this specialization + } else if (payload_config.has_simple_params()) { + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(payload_config.simple_params().resp_size()); + req->mutable_payload()->set_type(grpc::testing::PayloadType::COMPRESSABLE); + int size = payload_config.simple_params().req_size(); + std::unique_ptr body(new char[size]); + req->mutable_payload()->set_body(body.get(), size); + } else if (payload_config.has_complex_params()) { + GPR_ASSERT(false); // not appropriate for this specialization + } else { + // default should be simple proto without payloads + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(0); + req->mutable_payload()->set_type(grpc::testing::PayloadType::COMPRESSABLE); + } +} +template <> +void CreateRequest(ByteBuffer *req, const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + if (payload_config.req_size() > 0) { + std::unique_ptr buf(new char[payload_config.req_size()]); + gpr_slice_from_copied_buffer(buf.get(), payload_config.req_size()); + Slice slice(s, Slice::STEAL_REF); + std::unique_ptr bbuf(new ByteBuffer(&slice, 1)); + req->MoveFrom(bbuf.get()); + } else { + GPR_ASSERT(false); // not appropriate for this specialization + } + } +} +} + +template class Client { public: - explicit Client(const ClientConfig& config) + Client(const ClientConfig& config, + std::function(std::shared_ptr)> create_stub) : channels_(config.client_channels()), + create_stub_(create_stub), timer_(new Timer), interarrival_timer_() { for (int i = 0; i < config.client_channels(); i++) { channels_[i].init(config.server_targets(i % config.server_targets_size()), config); } - if (config.payload_config().has_bytebuf_params()) { - GPR_ASSERT(false); // not yet implemented - } else if (config.payload_config().has_simple_params()) { - request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request_.set_response_size( - config.payload_config().simple_params().resp_size()); - request_.mutable_payload()->set_type( - grpc::testing::PayloadType::COMPRESSABLE); - int size = config.payload_config().simple_params().req_size(); - std::unique_ptr body(new char[size]); - request_.mutable_payload()->set_body(body.get(), size); - } else if (config.payload_config().has_complex_params()) { - GPR_ASSERT(false); // not yet implemented - } else { - // default should be simple proto without payloads - request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request_.set_response_size(0); - request_.mutable_payload()->set_type( - grpc::testing::PayloadType::COMPRESSABLE); - } + + ClientRequestCreation::CreateRequest(&request_, config.payload_config()); } virtual ~Client() {} @@ -134,7 +164,7 @@ class Client { } protected: - SimpleRequest request_; + RequestType request_; bool closed_loop_; class ClientChannelInfo { @@ -154,16 +184,17 @@ class Client { target, config.security_params().server_host_override(), config.has_security_params(), !config.security_params().use_test_ca()); - stub_ = BenchmarkService::NewStub(channel_); + stub_ = create_stub_(channel_); } Channel* get_channel() { return channel_.get(); } - BenchmarkService::Stub* get_stub() { return stub_.get(); } + StubType* get_stub() { return stub_.get(); } private: std::shared_ptr channel_; - std::unique_ptr stub_; + std::unique_ptr stub_; }; std::vector channels_; + std::function(std::shared_ptr)> create_stub_; void StartThreads(size_t num_threads) { for (size_t i = 0; i < num_threads; i++) { @@ -306,7 +337,7 @@ class Client { size_t idx_; std::thread impl_; }; - + std::vector> threads_; std::unique_ptr timer_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9594179822d..c05774c4102 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -147,13 +147,14 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { typedef std::forward_list context_list; -class AsyncClient : public Client { +template +class AsyncClient : public Client { public: - explicit AsyncClient( + AsyncClient( const ClientConfig& config, - std::function setup_ctx) - : Client(config), + std::function setup_ctx, + std::function(std::shared_ptr)> create_stub) + : Client(config, create_stub), channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), @@ -343,10 +344,10 @@ class AsyncClient : public Client { int pref_channel_inc_; }; -class AsyncUnaryClient GRPC_FINAL : public AsyncClient { +class AsyncUnaryClient GRPC_FINAL : public AsyncClient { public: explicit AsyncUnaryClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx) { + : AsyncClient(config, SetupCtx, BenchmarkService::NewStub) { StartThreads(config.async_client_threads()); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } @@ -437,10 +438,10 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { stream_; }; -class AsyncStreamingClient GRPC_FINAL : public AsyncClient { +class AsyncStreamingClient GRPC_FINAL : public AsyncClient { public: explicit AsyncStreamingClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx) { + : AsyncClient(config, SetupCtx, BenchmarkService::NewStub) { // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); @@ -467,12 +468,113 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { } }; +class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { + public: + ClientGenericRpcContextStreamingImpl( + int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, + std::function( + grpc::GenericStub*, grpc::ClientContext*, const grpc::string& method_name, + CompletionQueue*, void*)> start_req, + std::function on_done) + : ClientRpcContext(channel_id), + context_(), + stub_(stub), + req_(req), + response_(), + next_state_(&ClientGenericRpcContextStreamingImpl::ReqSent), + callback_(on_done), + start_req_(start_req), + start_(Timer::Now()) {} + ~ClientGenericRpcContextStreamingImpl() GRPC_OVERRIDE {} + bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + return (this->*next_state_)(ok, hist); + } + ClientRpcContext* StartNewClone() GRPC_OVERRIDE { + return new ClientGenericRpcContextStreamingImpl(channel_id_, stub_, req_, + start_req_, callback_); + } + void Start(CompletionQueue* cq) GRPC_OVERRIDE { + const grpc::string kMethodName("/grpc.testing.BenchmarkService/StreamingCall"); + stream_ = start_req_(stub_, &context_, kMethodName, cq, ClientRpcContext::tag(this)); + } + + private: + bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); } + bool StartWrite(bool ok) { + if (!ok) { + return (false); + } + start_ = Timer::Now(); + next_state_ = &ClientGenericRpcContextStreamingImpl::WriteDone; + stream_->Write(req_, ClientRpcContext::tag(this)); + return true; + } + bool WriteDone(bool ok, Histogram*) { + if (!ok) { + return (false); + } + next_state_ = &ClientGenericRpcContextStreamingImpl::ReadDone; + stream_->Read(&response_, ClientRpcContext::tag(this)); + return true; + } + bool ReadDone(bool ok, Histogram* hist) { + hist->Add((Timer::Now() - start_) * 1e9); + return StartWrite(ok); + } + grpc::ClientContext context_; + grpc::GenericStub* stub_; + ByteBuffer req_; + ByteBuffer response_; + bool (ClientGenericRpcContextStreamingImpl::*next_state_)(bool, Histogram*); + std::function callback_; + std::function< + std::unique_ptr( + grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, CompletionQueue*, + void*)> start_req_; + grpc::Status status_; + double start_; + std::unique_ptr stream_; +}; + +class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient { + public: + explicit GenericAsyncStreamingClient(const ClientConfig& config) + : AsyncClient(config, SetupCtx, grpc::GenericStub) { + // async streaming currently only supports closed loop + GPR_ASSERT(closed_loop_); + + StartThreads(config.async_client_threads()); + } + + ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + + private: + static void CheckDone(grpc::Status s, ByteBuffer* response) {} + static std::unique_ptr + StartReq(grpc::GenericStub* stub, grpc::ClientContext* ctx, + const grpc::string& method_name, CompletionQueue* cq, void* tag) { + auto stream = stub->Call(ctx, method_name, cq, tag); + return stream; + }; + static ClientRpcContext* SetupCtx(int channel_id, + grpc::GenericStub* stub, + const SimpleRequest& req) { + return new ClientRpcContextStreamingImpl( + channel_id, stub, req, AsyncStreamingClient::StartReq, + AsyncStreamingClient::CheckDone); + } +}; + std::unique_ptr CreateAsyncUnaryClient(const ClientConfig& args) { return std::unique_ptr(new AsyncUnaryClient(args)); } std::unique_ptr CreateAsyncStreamingClient(const ClientConfig& args) { return std::unique_ptr(new AsyncStreamingClient(args)); } +std::unique_ptr CreateGenericAsyncStreamingClient(const ClientConfig& args) { + return std::unique_ptr(new GenericAsyncStreamingClient(args)); +} } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 6e81edc8ffe..7c52443d4e4 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -75,12 +75,11 @@ class Server { } static bool SetPayload(PayloadType type, int size, Payload* payload) { - PayloadType response_type = type; // TODO(yangg): Support UNCOMPRESSABLE payload. if (type != PayloadType::COMPRESSABLE) { return false; } - payload->set_type(response_type); + payload->set_type(type); std::unique_ptr body(new char[size]()); payload->set_body(body.get(), size); return true; From e4886680752e9181e0d848c3046e4a8d1eddffa3 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 30 Dec 2015 11:56:19 -0800 Subject: [PATCH 02/12] WIP --- test/cpp/qps/client.h | 121 +++++++++++++++++++---------------- test/cpp/qps/client_async.cc | 23 +++++-- test/cpp/qps/client_sync.cc | 7 +- 3 files changed, 86 insertions(+), 65 deletions(-) diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index b11a83b570b..9a2894687dd 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -37,6 +37,9 @@ #include #include +#include +#include + #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" @@ -74,7 +77,7 @@ void CreateRequest(RequestType *req, const PayloadConfig&) { // check since these only happen at the beginning anyway GPR_ASSERT(false); } - + template <> void CreateRequest(SimpleRequest *req, const PayloadConfig& payload_config) { if (payload_config.has_bytebuf_params()) { @@ -96,11 +99,15 @@ void CreateRequest(SimpleRequest *req, const PayloadConfig& paylo } } template <> -void CreateRequest(ByteBuffer *req, const PayloadConfig& payload_config) { +void CreateRequest(ByteBuffer *req, + const PayloadConfig& payload_config) { if (payload_config.has_bytebuf_params()) { - if (payload_config.req_size() > 0) { - std::unique_ptr buf(new char[payload_config.req_size()]); - gpr_slice_from_copied_buffer(buf.get(), payload_config.req_size()); + if (payload_config.bytebuf_params().req_size() > 0) { + std::unique_ptr + buf(new char[payload_config.bytebuf_params().req_size()]); + gpr_slice s = + gpr_slice_from_copied_buffer(buf.get(), + payload_config.bytebuf_params().req_size()); Slice slice(s, Slice::STEAL_REF); std::unique_ptr bbuf(new ByteBuffer(&slice, 1)); req->MoveFrom(bbuf.get()); @@ -110,24 +117,11 @@ void CreateRequest(ByteBuffer *req, const PayloadConfig& payload_con } } } - -template + class Client { public: - Client(const ClientConfig& config, - std::function(std::shared_ptr)> create_stub) - : channels_(config.client_channels()), - create_stub_(create_stub), - timer_(new Timer), - interarrival_timer_() { - for (int i = 0; i < config.client_channels(); i++) { - channels_[i].init(config.server_targets(i % config.server_targets_size()), - config); - } - - ClientRequestCreation::CreateRequest(&request_, config.payload_config()); - } - virtual ~Client() {} + Client() : timer_(new Timer), interarrival_timer_() {} + virtual ~Client(); ClientStats Mark(bool reset) { Histogram latencies; @@ -162,40 +156,9 @@ class Client { stats.set_time_user(timer_result.user); return stats; } - protected: - RequestType request_; bool closed_loop_; - class ClientChannelInfo { - public: - ClientChannelInfo() {} - ClientChannelInfo(const ClientChannelInfo& i) { - // The copy constructor is to satisfy old compilers - // that need it for using std::vector . It is only ever - // used for empty entries - GPR_ASSERT(!i.channel_ && !i.stub_); - } - void init(const grpc::string& target, const ClientConfig& config) { - // We have to use a 2-phase init like this with a default - // constructor followed by an initializer function to make - // old compilers happy with using this in std::vector - channel_ = CreateTestChannel( - target, config.security_params().server_host_override(), - config.has_security_params(), - !config.security_params().use_test_ca()); - stub_ = create_stub_(channel_); - } - Channel* get_channel() { return channel_.get(); } - StubType* get_stub() { return stub_.get(); } - - private: - std::shared_ptr channel_; - std::unique_ptr stub_; - }; - std::vector channels_; - std::function(std::shared_ptr)> create_stub_; - void StartThreads(size_t num_threads) { for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); @@ -264,7 +227,6 @@ class Client { return true; } } - private: class Thread { public: @@ -326,8 +288,6 @@ class Client { } } - BenchmarkService::Stub* stub_; - ClientConfig config_; std::mutex mu_; std::condition_variable cv_; bool done_; @@ -337,7 +297,7 @@ class Client { size_t idx_; std::thread impl_; }; - + std::vector> threads_; std::unique_ptr timer_; @@ -345,6 +305,55 @@ class Client { std::vector next_time_; }; +template +class ClientImpl : public Client { + public: + ClientImpl(const ClientConfig& config, + std::function(std::shared_ptr)> create_stub) + : channels_(config.client_channels()), + create_stub_(create_stub) { + for (int i = 0; i < config.client_channels(); i++) { + channels_[i].init(config.server_targets(i % config.server_targets_size()), + config); + } + + ClientRequestCreation::CreateRequest(&request_, config.payload_config()); + } + virtual ~ClientImpl() {} + + protected: + RequestType request_; + + class ClientChannelInfo { + public: + ClientChannelInfo() {} + ClientChannelInfo(const ClientChannelInfo& i) { + // The copy constructor is to satisfy old compilers + // that need it for using std::vector . It is only ever + // used for empty entries + GPR_ASSERT(!i.channel_ && !i.stub_); + } + void init(const grpc::string& target, const ClientConfig& config) { + // We have to use a 2-phase init like this with a default + // constructor followed by an initializer function to make + // old compilers happy with using this in std::vector + channel_ = CreateTestChannel( + target, config.security_params().server_host_override(), + config.has_security_params(), + !config.security_params().use_test_ca()); + stub_ = create_stub_(channel_); + } + Channel* get_channel() { return channel_.get(); } + StubType* get_stub() { return stub_.get(); } + + private: + std::shared_ptr channel_; + std::unique_ptr stub_; + }; + std::vector channels_; + std::function(std::shared_ptr)> create_stub_; +}; + std::unique_ptr CreateSynchronousUnaryClient(const ClientConfig& args); std::unique_ptr CreateSynchronousStreamingClient( const ClientConfig& args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index c05774c4102..fdfe1a567ae 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -47,6 +47,7 @@ #include #include #include +#include #include "test/cpp/qps/timer.h" #include "test/cpp/qps/client.h" @@ -148,13 +149,20 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { typedef std::forward_list context_list; template -class AsyncClient : public Client { +class AsyncClient : public ClientImpl { + // Specify which protected members we are using since there is no + // member name resolution until the template types are fully resolved public: + using Client::SetupLoadTest; + using Client::NextIssueTime; + using Client::closed_loop_; + using ClientImpl::channels_; + using ClientImpl::request_; AsyncClient( const ClientConfig& config, std::function setup_ctx, std::function(std::shared_ptr)> create_stub) - : Client(config, create_stub), + : ClientImpl(config, create_stub), channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), @@ -344,7 +352,8 @@ class AsyncClient : public Client { int pref_channel_inc_; }; -class AsyncUnaryClient GRPC_FINAL : public AsyncClient { +class AsyncUnaryClient GRPC_FINAL : + public AsyncClient { public: explicit AsyncUnaryClient(const ClientConfig& config) : AsyncClient(config, SetupCtx, BenchmarkService::NewStub) { @@ -559,10 +568,10 @@ class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient( - channel_id, stub, req, AsyncStreamingClient::StartReq, - AsyncStreamingClient::CheckDone); + const ByteBuffer& req) { + return new ClientGenericRpcContextStreamingImpl( + channel_id, stub, req, GenericAsyncStreamingClient::StartReq, + GenericAsyncStreamingClient::CheckDone); } }; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 10d680860a9..409fc269723 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -64,9 +64,12 @@ namespace grpc { namespace testing { -class SynchronousClient : public Client { +class SynchronousClient : + public ClientImpl { public: - SynchronousClient(const ClientConfig& config) : Client(config) { + SynchronousClient(const ClientConfig& config) : + ClientImpl(config, BenchmarkService::NewStub) { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); From 18c0477528169c2032a57b5da094964a6d4beb2f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 4 Jan 2016 09:52:10 -0800 Subject: [PATCH 03/12] Got rid of all compilation problems, now need to test --- include/grpc++/support/byte_buffer.h | 5 +- src/cpp/util/byte_buffer.cc | 4 + test/cpp/qps/client.h | 115 +++++++++++++++------------ test/cpp/qps/client_async.cc | 101 +++++++++++++---------- test/cpp/qps/client_sync.cc | 25 +++--- 5 files changed, 148 insertions(+), 102 deletions(-) diff --git a/include/grpc++/support/byte_buffer.h b/include/grpc++/support/byte_buffer.h index 6f29bdfcd55..3adf86c4794 100644 --- a/include/grpc++/support/byte_buffer.h +++ b/include/grpc++/support/byte_buffer.h @@ -55,6 +55,10 @@ class ByteBuffer GRPC_FINAL { /// Construct buffer from \a slices, of which there are \a nslices. ByteBuffer(const Slice* slices, size_t nslices); + /// Constuct a byte buffer by referencing elements of existing buffer + /// \a buf. Wrapper of core function grpc_byte_buffer_copy + ByteBuffer(const ByteBuffer&buf); + ~ByteBuffer(); /// Dump (read) the buffer contents into \a slices. @@ -72,7 +76,6 @@ class ByteBuffer GRPC_FINAL { private: friend class SerializationTraits; - ByteBuffer(const ByteBuffer&); ByteBuffer& operator=(const ByteBuffer&); // takes ownership diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc index 4d73599542f..467f0007de7 100644 --- a/src/cpp/util/byte_buffer.cc +++ b/src/cpp/util/byte_buffer.cc @@ -85,4 +85,8 @@ void ByteBuffer::MoveFrom(ByteBuffer* bbuf) { bbuf->buffer_ = nullptr; } +ByteBuffer::ByteBuffer(const ByteBuffer& buf): + buffer_(grpc_byte_buffer_copy(buf.buffer_)) { +} + } // namespace grpc diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 9a2894687dd..1a0a53d23b4 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -69,59 +69,68 @@ namespace testing { typedef std::chrono::high_resolution_clock grpc_time_source; typedef std::chrono::time_point grpc_time; -namespace ClientRequestCreation { template -void CreateRequest(RequestType *req, const PayloadConfig&) { - // this template must be specialized - // fail with an assertion rather than a compile-time - // check since these only happen at the beginning anyway - GPR_ASSERT(false); -} +class ClientRequestCreator { + public: + ClientRequestCreator(RequestType* req, const PayloadConfig&) { + // this template must be specialized + // fail with an assertion rather than a compile-time + // check since these only happen at the beginning anyway + GPR_ASSERT(false); + } +}; template <> -void CreateRequest(SimpleRequest *req, const PayloadConfig& payload_config) { - if (payload_config.has_bytebuf_params()) { - GPR_ASSERT(false); // not appropriate for this specialization - } else if (payload_config.has_simple_params()) { - req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - req->set_response_size(payload_config.simple_params().resp_size()); - req->mutable_payload()->set_type(grpc::testing::PayloadType::COMPRESSABLE); - int size = payload_config.simple_params().req_size(); - std::unique_ptr body(new char[size]); - req->mutable_payload()->set_body(body.get(), size); - } else if (payload_config.has_complex_params()) { - GPR_ASSERT(false); // not appropriate for this specialization - } else { - // default should be simple proto without payloads - req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - req->set_response_size(0); - req->mutable_payload()->set_type(grpc::testing::PayloadType::COMPRESSABLE); +class ClientRequestCreator { + public: + ClientRequestCreator(SimpleRequest* req, + const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + GPR_ASSERT(false); // not appropriate for this specialization + } else if (payload_config.has_simple_params()) { + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(payload_config.simple_params().resp_size()); + req->mutable_payload()->set_type( + grpc::testing::PayloadType::COMPRESSABLE); + int size = payload_config.simple_params().req_size(); + std::unique_ptr body(new char[size]); + req->mutable_payload()->set_body(body.get(), size); + } else if (payload_config.has_complex_params()) { + GPR_ASSERT(false); // not appropriate for this specialization + } else { + // default should be simple proto without payloads + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(0); + req->mutable_payload()->set_type( + grpc::testing::PayloadType::COMPRESSABLE); + } } -} +}; + template <> -void CreateRequest(ByteBuffer *req, - const PayloadConfig& payload_config) { - if (payload_config.has_bytebuf_params()) { - if (payload_config.bytebuf_params().req_size() > 0) { - std::unique_ptr - buf(new char[payload_config.bytebuf_params().req_size()]); - gpr_slice s = - gpr_slice_from_copied_buffer(buf.get(), - payload_config.bytebuf_params().req_size()); - Slice slice(s, Slice::STEAL_REF); - std::unique_ptr bbuf(new ByteBuffer(&slice, 1)); - req->MoveFrom(bbuf.get()); - } else { - GPR_ASSERT(false); // not appropriate for this specialization +class ClientRequestCreator { + public: + ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + if (payload_config.bytebuf_params().req_size() > 0) { + std::unique_ptr buf( + new char[payload_config.bytebuf_params().req_size()]); + gpr_slice s = gpr_slice_from_copied_buffer( + buf.get(), payload_config.bytebuf_params().req_size()); + Slice slice(s, Slice::STEAL_REF); + std::unique_ptr bbuf(new ByteBuffer(&slice, 1)); + req->MoveFrom(bbuf.get()); + } else { + GPR_ASSERT(false); // not appropriate for this specialization + } } } -} -} +}; class Client { public: Client() : timer_(new Timer), interarrival_timer_() {} - virtual ~Client(); + virtual ~Client() {} ClientStats Mark(bool reset) { Histogram latencies; @@ -156,6 +165,7 @@ class Client { stats.set_time_user(timer_result.user); return stats; } + protected: bool closed_loop_; @@ -227,6 +237,7 @@ class Client { return true; } } + private: class Thread { public: @@ -309,15 +320,16 @@ template class ClientImpl : public Client { public: ClientImpl(const ClientConfig& config, - std::function(std::shared_ptr)> create_stub) - : channels_(config.client_channels()), - create_stub_(create_stub) { + std::function(std::shared_ptr)> + create_stub) + : channels_(config.client_channels()), create_stub_(create_stub) { for (int i = 0; i < config.client_channels(); i++) { channels_[i].init(config.server_targets(i % config.server_targets_size()), - config); + config, create_stub_); } - ClientRequestCreation::CreateRequest(&request_, config.payload_config()); + ClientRequestCreator create_req(&request_, + config.payload_config()); } virtual ~ClientImpl() {} @@ -333,7 +345,9 @@ class ClientImpl : public Client { // used for empty entries GPR_ASSERT(!i.channel_ && !i.stub_); } - void init(const grpc::string& target, const ClientConfig& config) { + void init(const grpc::string& target, const ClientConfig& config, + std::function(std::shared_ptr)> + create_stub) { // We have to use a 2-phase init like this with a default // constructor followed by an initializer function to make // old compilers happy with using this in std::vector @@ -341,7 +355,7 @@ class ClientImpl : public Client { target, config.security_params().server_host_override(), config.has_security_params(), !config.security_params().use_test_ca()); - stub_ = create_stub_(channel_); + stub_ = create_stub(channel_); } Channel* get_channel() { return channel_.get(); } StubType* get_stub() { return stub_.get(); } @@ -351,7 +365,8 @@ class ClientImpl : public Client { std::unique_ptr stub_; }; std::vector channels_; - std::function(std::shared_ptr)> create_stub_; + std::function(const std::shared_ptr&)> + create_stub_; }; std::unique_ptr CreateSynchronousUnaryClient(const ClientConfig& args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index fdfe1a567ae..087ea75bf40 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -37,20 +37,20 @@ #include #include #include +#include #include #include #include -#include -#include -#include -#include #include #include #include +#include +#include +#include -#include "test/cpp/qps/timer.h" #include "test/cpp/qps/client.h" +#include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" #include "test/proto/benchmarks/services.grpc.pb.h" @@ -93,7 +93,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function< std::unique_ptr>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req, + CompletionQueue*)> + start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -139,7 +140,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function callback_; std::function>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req_; + CompletionQueue*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr> @@ -156,13 +158,15 @@ class AsyncClient : public ClientImpl { using Client::SetupLoadTest; using Client::NextIssueTime; using Client::closed_loop_; - using ClientImpl::channels_; - using ClientImpl::request_; + using ClientImpl::channels_; + using ClientImpl::request_; AsyncClient( const ClientConfig& config, - std::function setup_ctx, - std::function(std::shared_ptr)> create_stub) - : ClientImpl(config, create_stub), + std::function + setup_ctx, + std::function(std::shared_ptr)> + create_stub) + : ClientImpl(config, create_stub), channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), @@ -352,11 +356,16 @@ class AsyncClient : public ClientImpl { int pref_channel_inc_; }; -class AsyncUnaryClient GRPC_FINAL : - public AsyncClient { +static std::unique_ptr BenchmarkStubCreator( + std::shared_ptr ch) { + return BenchmarkService::NewStub(ch); +} + +class AsyncUnaryClient GRPC_FINAL + : public AsyncClient { public: explicit AsyncUnaryClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx, BenchmarkService::NewStub) { + : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { StartThreads(config.async_client_threads()); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } @@ -385,7 +394,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function>( BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> start_req, + void*)> + start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -437,20 +447,21 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { ResponseType response_; bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*); std::function callback_; - std::function< - std::unique_ptr>( - BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> start_req_; + std::function>( + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr> stream_; }; -class AsyncStreamingClient GRPC_FINAL : public AsyncClient { +class AsyncStreamingClient GRPC_FINAL + : public AsyncClient { public: explicit AsyncStreamingClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx, BenchmarkService::NewStub) { + : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); @@ -481,10 +492,10 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { public: ClientGenericRpcContextStreamingImpl( int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, - std::function( - grpc::GenericStub*, grpc::ClientContext*, const grpc::string& method_name, - CompletionQueue*, void*)> start_req, + std::function( + grpc::GenericStub*, grpc::ClientContext*, + const grpc::string& method_name, CompletionQueue*, void*)> + start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -501,11 +512,13 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { } ClientRpcContext* StartNewClone() GRPC_OVERRIDE { return new ClientGenericRpcContextStreamingImpl(channel_id_, stub_, req_, - start_req_, callback_); + start_req_, callback_); } void Start(CompletionQueue* cq) GRPC_OVERRIDE { - const grpc::string kMethodName("/grpc.testing.BenchmarkService/StreamingCall"); - stream_ = start_req_(stub_, &context_, kMethodName, cq, ClientRpcContext::tag(this)); + const grpc::string kMethodName( + "/grpc.testing.BenchmarkService/StreamingCall"); + stream_ = start_req_(stub_, &context_, kMethodName, cq, + ClientRpcContext::tag(this)); } private: @@ -537,19 +550,25 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { ByteBuffer response_; bool (ClientGenericRpcContextStreamingImpl::*next_state_)(bool, Histogram*); std::function callback_; - std::function< - std::unique_ptr( - grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, CompletionQueue*, - void*)> start_req_; + std::function( + grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, + CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr stream_; }; -class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient { +static std::unique_ptr GenericStubCreator( + std::shared_ptr ch) { + return std::unique_ptr(new grpc::GenericStub(ch)); +} + +class GenericAsyncStreamingClient GRPC_FINAL + : public AsyncClient { public: explicit GenericAsyncStreamingClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx, grpc::GenericStub) { + : AsyncClient(config, SetupCtx, GenericStubCreator) { // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); @@ -560,14 +579,13 @@ class GenericAsyncStreamingClient GRPC_FINAL : public AsyncClient - StartReq(grpc::GenericStub* stub, grpc::ClientContext* ctx, - const grpc::string& method_name, CompletionQueue* cq, void* tag) { + static std::unique_ptr StartReq( + grpc::GenericStub* stub, grpc::ClientContext* ctx, + const grpc::string& method_name, CompletionQueue* cq, void* tag) { auto stream = stub->Call(ctx, method_name, cq, tag); return stream; }; - static ClientRpcContext* SetupCtx(int channel_id, - grpc::GenericStub* stub, + static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub, const ByteBuffer& req) { return new ClientGenericRpcContextStreamingImpl( channel_id, stub, req, GenericAsyncStreamingClient::StartReq, @@ -581,7 +599,8 @@ std::unique_ptr CreateAsyncUnaryClient(const ClientConfig& args) { std::unique_ptr CreateAsyncStreamingClient(const ClientConfig& args) { return std::unique_ptr(new AsyncStreamingClient(args)); } -std::unique_ptr CreateGenericAsyncStreamingClient(const ClientConfig& args) { +std::unique_ptr CreateGenericAsyncStreamingClient( + const ClientConfig& args) { return std::unique_ptr(new GenericAsyncStreamingClient(args)); } diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 409fc269723..c27ca7a623f 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -35,28 +35,28 @@ #include #include #include +#include #include #include #include -#include #include +#include +#include +#include #include #include #include #include #include #include -#include -#include -#include #include -#include "test/cpp/util/create_test_channel.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" +#include "test/cpp/util/create_test_channel.h" #include "test/proto/benchmarks/services.grpc.pb.h" #include "src/core/profiling/timers.h" @@ -64,12 +64,17 @@ namespace grpc { namespace testing { -class SynchronousClient : - public ClientImpl { +static std::unique_ptr BenchmarkStubCreator( + std::shared_ptr ch) { + return BenchmarkService::NewStub(ch); +} + +class SynchronousClient + : public ClientImpl { public: - SynchronousClient(const ClientConfig& config) : - ClientImpl(config, BenchmarkService::NewStub) { + SynchronousClient(const ClientConfig& config) + : ClientImpl( + config, BenchmarkStubCreator) { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); From de332dfcac51e080e4c294d183906e5969672133 Mon Sep 17 00:00:00 2001 From: vjpai Date: Thu, 7 Jan 2016 10:20:46 -0800 Subject: [PATCH 04/12] Refactor server side to support generic tests. --- .../grpc++/generic/async_generic_service.h | 1 + include/grpc++/support/byte_buffer.h | 4 +- src/cpp/util/byte_buffer.cc | 6 + test/cpp/qps/server_async.cc | 108 ++++++++++++------ 4 files changed, 79 insertions(+), 40 deletions(-) diff --git a/include/grpc++/generic/async_generic_service.h b/include/grpc++/generic/async_generic_service.h index 8578d850ffd..f209c077e6c 100644 --- a/include/grpc++/generic/async_generic_service.h +++ b/include/grpc++/generic/async_generic_service.h @@ -61,6 +61,7 @@ class AsyncGenericService GRPC_FINAL { // TODO(yangg) Once we can add multiple completion queues to the server // in c core, add a CompletionQueue* argument to the ctor here. // TODO(yangg) support methods list. + AsyncGenericService() : server_(nullptr) {} AsyncGenericService(const grpc::string& methods) : server_(nullptr) {} void RequestCall(GenericServerContext* ctx, diff --git a/include/grpc++/support/byte_buffer.h b/include/grpc++/support/byte_buffer.h index 3adf86c4794..a089dbb4cb5 100644 --- a/include/grpc++/support/byte_buffer.h +++ b/include/grpc++/support/byte_buffer.h @@ -61,6 +61,8 @@ class ByteBuffer GRPC_FINAL { ~ByteBuffer(); + ByteBuffer& operator=(const ByteBuffer&); + /// Dump (read) the buffer contents into \a slices. void Dump(std::vector* slices) const; @@ -76,8 +78,6 @@ class ByteBuffer GRPC_FINAL { private: friend class SerializationTraits; - ByteBuffer& operator=(const ByteBuffer&); - // takes ownership void set_buffer(grpc_byte_buffer* buf) { if (buffer_) { diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc index 467f0007de7..91e202023c5 100644 --- a/src/cpp/util/byte_buffer.cc +++ b/src/cpp/util/byte_buffer.cc @@ -89,4 +89,10 @@ ByteBuffer::ByteBuffer(const ByteBuffer& buf): buffer_(grpc_byte_buffer_copy(buf.buffer_)) { } +ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) { + Clear(); // first remove existing data + buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy + return *this; +} + } // namespace grpc diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c151918ce4c..85a47ff71d8 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -55,9 +56,15 @@ namespace grpc { namespace testing { +template class AsyncQpsServerTest : public Server { public: - explicit AsyncQpsServerTest(const ServerConfig &config) : Server(config) { + AsyncQpsServerTest(const ServerConfig &config, + std::function register_service, + std::function*, CompletionQueue *, ServerCompletionQueue *, void *)> request_unary_function, + std::function*, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, + std::function process_rpc) + : Server(config) { char *server_address = NULL; gpr_join_host_port(&server_address, config.host().c_str(), port()); @@ -67,7 +74,8 @@ class AsyncQpsServerTest : public Server { Server::CreateServerCredentials(config)); gpr_free(server_address); - builder.RegisterAsyncService(&async_service_); + register_service(&builder, &async_service_); + for (int i = 0; i < config.async_server_threads(); i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } @@ -75,22 +83,27 @@ class AsyncQpsServerTest : public Server { server_ = builder.BuildAndStart(); using namespace std::placeholders; + + auto process_rpc_bound = std::bind(process_rpc, config, _1, _2); + for (int i = 0; i < 10000 / config.async_server_threads(); i++) { for (int j = 0; j < config.async_server_threads(); j++) { - auto request_unary = std::bind( - &BenchmarkService::AsyncService::RequestUnaryCall, &async_service_, - _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); - auto request_streaming = std::bind( - &BenchmarkService::AsyncService::RequestStreamingCall, - &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); - contexts_.push_front( - new ServerRpcContextUnaryImpl( - request_unary, ProcessRPC)); - contexts_.push_front( - new ServerRpcContextStreamingImpl( - request_streaming, ProcessRPC)); + if (request_unary_function) { + auto request_unary = std::bind( + request_unary_function, &async_service_, + _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + contexts_.push_front(new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); + } + if (request_streaming_function) { + auto request_streaming = std::bind( + request_streaming_function, + &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + contexts_.push_front(new ServerRpcContextStreamingImpl( + request_streaming, process_rpc_bound)); + } } } + for (int i = 0; i < config.async_server_threads(); i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); } @@ -155,16 +168,15 @@ class AsyncQpsServerTest : public Server { return reinterpret_cast(tag); } - template class ServerRpcContextUnaryImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextUnaryImpl( - std::function *, void *)> request_method, std::function invoke_method) - : srv_ctx_(new ServerContext), + : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextUnaryImpl::invoker), request_method_(request_method), invoke_method_(invoke_method), @@ -177,7 +189,7 @@ class AsyncQpsServerTest : public Server { return (this->*next_state_)(ok); } void Reset() GRPC_OVERRIDE { - srv_ctx_.reset(new ServerContext); + srv_ctx_.reset(new ServerContextType); req_ = RequestType(); response_writer_ = grpc::ServerAsyncResponseWriter(srv_ctx_.get()); @@ -205,10 +217,10 @@ class AsyncQpsServerTest : public Server { response_writer_.Finish(response, status, AsyncQpsServerTest::tag(this)); return true; } - std::unique_ptr srv_ctx_; + std::unique_ptr srv_ctx_; RequestType req_; bool (ServerRpcContextUnaryImpl::*next_state_)(bool); - std::function *, void *)> request_method_; std::function @@ -216,16 +228,15 @@ class AsyncQpsServerTest : public Server { grpc::ServerAsyncResponseWriter response_writer_; }; - template class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextStreamingImpl( - std::function *, void *)> request_method, std::function invoke_method) - : srv_ctx_(new ServerContext), + : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingImpl::request_done), request_method_(request_method), invoke_method_(invoke_method), @@ -237,7 +248,7 @@ class AsyncQpsServerTest : public Server { return (this->*next_state_)(ok); } void Reset() GRPC_OVERRIDE { - srv_ctx_.reset(new ServerContext); + srv_ctx_.reset(new ServerContextType); req_ = RequestType(); stream_ = grpc::ServerAsyncReaderWriter( srv_ctx_.get()); @@ -286,11 +297,11 @@ class AsyncQpsServerTest : public Server { } bool finish_done(bool ok) { return false; /* reset the context */ } - std::unique_ptr srv_ctx_; + std::unique_ptr srv_ctx_; RequestType req_; bool (ServerRpcContextStreamingImpl::*next_state_)(bool); std::function *, void *)> request_method_; std::function @@ -298,20 +309,10 @@ class AsyncQpsServerTest : public Server { grpc::ServerAsyncReaderWriter stream_; }; - static Status ProcessRPC(const SimpleRequest *request, - SimpleResponse *response) { - if (request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); - } - } - return Status::OK; - } std::vector threads_; std::unique_ptr server_; std::vector> srv_cqs_; - BenchmarkService::AsyncService async_service_; + ServiceType async_service_; std::forward_list contexts_; class PerThreadShutdownState { @@ -335,8 +336,39 @@ class AsyncQpsServerTest : public Server { std::vector> shutdown_state_; }; +static void RegisterBenchmarkService(ServerBuilder *builder, + BenchmarkService::AsyncService *service) { + builder->RegisterAsyncService(service); +} +static void RegisterGenericService(ServerBuilder *builder, + grpc::AsyncGenericService *service) { + builder->RegisterAsyncGenericService(service); +} + +template +Status ProcessRPC(const ServerConfig &config, const RequestType *request, + ResponseType *response) { + if (request->response_size() > 0) { + if (!Server::SetPayload(request->response_type(), request->response_size(), + response->mutable_payload())) { + return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + } + } + return Status::OK; +} + +template<> +Status ProcessRPC(const ServerConfig &config, const ByteBuffer *request, + ByteBuffer *response) { + return Status::OK; +} + + std::unique_ptr CreateAsyncServer(const ServerConfig &config) { - return std::unique_ptr(new AsyncQpsServerTest(config)); + return std::unique_ptr(new AsyncQpsServerTest(config, RegisterBenchmarkService, &BenchmarkService::AsyncService::RequestUnaryCall, &BenchmarkService::AsyncService::RequestStreamingCall, ProcessRPC)); +} +std::unique_ptr CreateAsyncGenericServer(const ServerConfig &config) { + return std::unique_ptr(new AsyncQpsServerTest(config, RegisterGenericService, nullptr, &grpc::AsyncGenericService::RequestCall, ProcessRPC)); } } // namespace testing From 31c0c7949fdd0e0cc6b370e268e3d578b71c6aab Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 7 Jan 2016 11:43:46 -0800 Subject: [PATCH 05/12] clang-format --- test/cpp/qps/server_async.cc | 106 +++++++++++++++++++++-------------- 1 file changed, 65 insertions(+), 41 deletions(-) diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 85a47ff71d8..b1e393dd40b 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -38,16 +38,16 @@ #include #include -#include -#include -#include -#include #include -#include +#include #include #include #include -#include +#include +#include +#include +#include +#include #include #include "test/cpp/qps/server.h" @@ -56,15 +56,25 @@ namespace grpc { namespace testing { -template +template class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest(const ServerConfig &config, - std::function register_service, - std::function*, CompletionQueue *, ServerCompletionQueue *, void *)> request_unary_function, - std::function*, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, - std::function process_rpc) - : Server(config) { + AsyncQpsServerTest( + const ServerConfig &config, + std::function register_service, + std::function *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_unary_function, + std::function *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_streaming_function, + std::function + process_rpc) + : Server(config) { char *server_address = NULL; gpr_join_host_port(&server_address, config.host().c_str(), port()); @@ -85,22 +95,23 @@ class AsyncQpsServerTest : public Server { using namespace std::placeholders; auto process_rpc_bound = std::bind(process_rpc, config, _1, _2); - + for (int i = 0; i < 10000 / config.async_server_threads(); i++) { for (int j = 0; j < config.async_server_threads(); j++) { - if (request_unary_function) { - auto request_unary = std::bind( - request_unary_function, &async_service_, - _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); - contexts_.push_front(new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); - } - if (request_streaming_function) { - auto request_streaming = std::bind( - request_streaming_function, - &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); - contexts_.push_front(new ServerRpcContextStreamingImpl( - request_streaming, process_rpc_bound)); - } + if (request_unary_function) { + auto request_unary = + std::bind(request_unary_function, &async_service_, _1, _2, _3, + srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + contexts_.push_front( + new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); + } + if (request_streaming_function) { + auto request_streaming = + std::bind(request_streaming_function, &async_service_, _1, _2, + srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + contexts_.push_front(new ServerRpcContextStreamingImpl( + request_streaming, process_rpc_bound)); + } } } @@ -173,7 +184,8 @@ class AsyncQpsServerTest : public Server { ServerRpcContextUnaryImpl( std::function *, - void *)> request_method, + void *)> + request_method, std::function invoke_method) : srv_ctx_(new ServerContextType), @@ -231,9 +243,10 @@ class AsyncQpsServerTest : public Server { class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextStreamingImpl( - std::function *, - void *)> request_method, + std::function *, void *)> + request_method, std::function invoke_method) : srv_ctx_(new ServerContextType), @@ -337,38 +350,49 @@ class AsyncQpsServerTest : public Server { }; static void RegisterBenchmarkService(ServerBuilder *builder, - BenchmarkService::AsyncService *service) { + BenchmarkService::AsyncService *service) { builder->RegisterAsyncService(service); } static void RegisterGenericService(ServerBuilder *builder, - grpc::AsyncGenericService *service) { + grpc::AsyncGenericService *service) { builder->RegisterAsyncGenericService(service); } -template +template Status ProcessRPC(const ServerConfig &config, const RequestType *request, - ResponseType *response) { + ResponseType *response) { if (request->response_size() > 0) { if (!Server::SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { + response->mutable_payload())) { return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); } } return Status::OK; } -template<> +template <> Status ProcessRPC(const ServerConfig &config, const ByteBuffer *request, - ByteBuffer *response) { + ByteBuffer *response) { return Status::OK; } - std::unique_ptr CreateAsyncServer(const ServerConfig &config) { - return std::unique_ptr(new AsyncQpsServerTest(config, RegisterBenchmarkService, &BenchmarkService::AsyncService::RequestUnaryCall, &BenchmarkService::AsyncService::RequestStreamingCall, ProcessRPC)); + return std::unique_ptr( + new AsyncQpsServerTest( + config, RegisterBenchmarkService, + &BenchmarkService::AsyncService::RequestUnaryCall, + &BenchmarkService::AsyncService::RequestStreamingCall, + ProcessRPC)); } std::unique_ptr CreateAsyncGenericServer(const ServerConfig &config) { - return std::unique_ptr(new AsyncQpsServerTest(config, RegisterGenericService, nullptr, &grpc::AsyncGenericService::RequestCall, ProcessRPC)); + return std::unique_ptr( + new AsyncQpsServerTest( + config, RegisterGenericService, nullptr, + &grpc::AsyncGenericService::RequestCall, + ProcessRPC)); } } // namespace testing From 93beeb889531fde79e14f65e270a595bfb8dc8d4 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 7 Jan 2016 12:12:40 -0800 Subject: [PATCH 06/12] Fix one use of grpc instead of gpr --- include/grpc++/support/slice.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/grpc++/support/slice.h b/include/grpc++/support/slice.h index 456379cc5be..88e08bedb53 100644 --- a/include/grpc++/support/slice.h +++ b/include/grpc++/support/slice.h @@ -39,7 +39,7 @@ namespace grpc { -/// A wrapper around \a grpc_slice. +/// A wrapper around \a gpr_slice. /// /// A slice represents a contiguous reference counted array of bytes. /// It is cheap to take references to a slice, and it is cheap to create a From 9f991e252dd1a5e58bc7d5be35d8bada883cab5a Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 7 Jan 2016 14:08:09 -0800 Subject: [PATCH 07/12] More changes needed for generic support --- test/cpp/qps/client.h | 5 +++-- test/cpp/qps/client_async.cc | 18 +++++++++--------- test/cpp/qps/server_async.cc | 27 +++++++++++++++++---------- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 1a0a53d23b4..1b5a3d4a078 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -118,8 +118,9 @@ class ClientRequestCreator { gpr_slice s = gpr_slice_from_copied_buffer( buf.get(), payload_config.bytebuf_params().req_size()); Slice slice(s, Slice::STEAL_REF); - std::unique_ptr bbuf(new ByteBuffer(&slice, 1)); - req->MoveFrom(bbuf.get()); + *req = ByteBuffer(&slice, 1); + // std::unique_ptr bbuf(new ByteBuffer(&slice, 1)); + // req->MoveFrom(bbuf.get()); } else { GPR_ASSERT(false); // not appropriate for this specialization } diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 087ea75bf40..553c97fd684 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -488,9 +488,9 @@ class AsyncStreamingClient GRPC_FINAL } }; -class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { +class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { public: - ClientGenericRpcContextStreamingImpl( + ClientRpcContextGenericStreamingImpl( int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, std::function( grpc::GenericStub*, grpc::ClientContext*, @@ -502,16 +502,16 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { stub_(stub), req_(req), response_(), - next_state_(&ClientGenericRpcContextStreamingImpl::ReqSent), + next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent), callback_(on_done), start_req_(start_req), start_(Timer::Now()) {} - ~ClientGenericRpcContextStreamingImpl() GRPC_OVERRIDE {} + ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {} bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { return (this->*next_state_)(ok, hist); } ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientGenericRpcContextStreamingImpl(channel_id_, stub_, req_, + return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_, start_req_, callback_); } void Start(CompletionQueue* cq) GRPC_OVERRIDE { @@ -528,7 +528,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { return (false); } start_ = Timer::Now(); - next_state_ = &ClientGenericRpcContextStreamingImpl::WriteDone; + next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone; stream_->Write(req_, ClientRpcContext::tag(this)); return true; } @@ -536,7 +536,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { if (!ok) { return (false); } - next_state_ = &ClientGenericRpcContextStreamingImpl::ReadDone; + next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone; stream_->Read(&response_, ClientRpcContext::tag(this)); return true; } @@ -548,7 +548,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext { grpc::GenericStub* stub_; ByteBuffer req_; ByteBuffer response_; - bool (ClientGenericRpcContextStreamingImpl::*next_state_)(bool, Histogram*); + bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*); std::function callback_; std::function( grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, @@ -587,7 +587,7 @@ class GenericAsyncStreamingClient GRPC_FINAL }; static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub, const ByteBuffer& req) { - return new ClientGenericRpcContextStreamingImpl( + return new ClientRpcContextGenericStreamingImpl( channel_id, stub, req, GenericAsyncStreamingClient::StartReq, GenericAsyncStreamingClient::CheckDone); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index b1e393dd40b..4932271273e 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -71,7 +71,7 @@ class AsyncQpsServerTest : public Server { ServerAsyncReaderWriter *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, - std::function process_rpc) : Server(config) { @@ -94,7 +94,8 @@ class AsyncQpsServerTest : public Server { using namespace std::placeholders; - auto process_rpc_bound = std::bind(process_rpc, config, _1, _2); + auto process_rpc_bound = std::bind(process_rpc, config.payload_config(), + _1, _2); for (int i = 0; i < 10000 / config.async_server_threads(); i++) { for (int j = 0; j < config.async_server_threads(); j++) { @@ -358,9 +359,10 @@ static void RegisterGenericService(ServerBuilder *builder, builder->RegisterAsyncGenericService(service); } -template -Status ProcessRPC(const ServerConfig &config, const RequestType *request, - ResponseType *response) { + +static Status ProcessSimpleRPC(const PayloadConfig&, + const SimpleRequest *request, + SimpleResponse *response) { if (request->response_size() > 0) { if (!Server::SetPayload(request->response_type(), request->response_size(), response->mutable_payload())) { @@ -370,9 +372,14 @@ Status ProcessRPC(const ServerConfig &config, const RequestType *request, return Status::OK; } -template <> -Status ProcessRPC(const ServerConfig &config, const ByteBuffer *request, - ByteBuffer *response) { +static Status ProcessGenericRPC(const PayloadConfig& payload_config, + const ByteBuffer *request, + ByteBuffer *response) { + int resp_size = payload_config.bytebuf_params().resp_size(); + std::unique_ptr buf(new char[resp_size]); + gpr_slice s = gpr_slice_from_copied_buffer(buf.get(), resp_size); + Slice slice(s, Slice::STEAL_REF); + *response = ByteBuffer(&slice, 1); return Status::OK; } @@ -384,7 +391,7 @@ std::unique_ptr CreateAsyncServer(const ServerConfig &config) { config, RegisterBenchmarkService, &BenchmarkService::AsyncService::RequestUnaryCall, &BenchmarkService::AsyncService::RequestStreamingCall, - ProcessRPC)); + ProcessSimpleRPC)); } std::unique_ptr CreateAsyncGenericServer(const ServerConfig &config) { return std::unique_ptr( @@ -392,7 +399,7 @@ std::unique_ptr CreateAsyncGenericServer(const ServerConfig &config) { grpc::GenericServerContext>( config, RegisterGenericService, nullptr, &grpc::AsyncGenericService::RequestCall, - ProcessRPC)); + ProcessGenericRPC)); } } // namespace testing From 083dc62cf60f66b40cf320f4d3c4c9d4791e1526 Mon Sep 17 00:00:00 2001 From: vjpai Date: Mon, 11 Jan 2016 09:41:17 -0800 Subject: [PATCH 08/12] Generic test --- build.yaml | 17 ++++ .../generic_async_streaming_ping_pong_test.cc | 82 +++++++++++++++++++ test/cpp/qps/qps_driver.cc | 9 ++ 3 files changed, 108 insertions(+) create mode 100644 test/cpp/qps/generic_async_streaming_ping_pong_test.cc diff --git a/build.yaml b/build.yaml index 6227b18b7d6..9bc63d34cf3 100644 --- a/build.yaml +++ b/build.yaml @@ -1950,6 +1950,23 @@ targets: - grpc - gpr_test_util - gpr +- name: generic_async_streaming_ping_pong_test + build: test + language: c++ + src: + - test/cpp/qps/generic_async_streaming_ping_pong_test.cc + deps: + - qps + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr_test_util + - gpr + platforms: + - mac + - linux + - posix - name: generic_end2end_test build: test language: c++ diff --git a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc new file mode 100644 index 00000000000..fe79b2d8037 --- /dev/null +++ b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc @@ -0,0 +1,82 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#include + +#include "test/cpp/qps/driver.h" +#include "test/cpp/qps/report.h" +#include "test/cpp/util/benchmark_config.h" + +namespace grpc { +namespace testing { + +static const int WARMUP = 5; +static const int BENCHMARK = 10; + +static void RunGenericAsyncStreamingPingPong() { + gpr_log(GPR_INFO, "Running Generic Async Streaming Ping Pong"); + + ClientConfig client_config; + client_config.set_client_type(ASYNC_CLIENT); + client_config.set_outstanding_rpcs_per_channel(1); + client_config.set_client_channels(1); + client_config.set_async_client_threads(1); + client_config.set_rpc_type(STREAMING); + client_config.mutable_load_params()->mutable_closed_loop(); + auto bbuf = client_config.mutable_payload_config()->mutable_bytebuf_params(); + bbuf->set_resp_size(0); + bbuf->set_req_size(0); + + ServerConfig server_config; + server_config.set_server_type(ASYNC_SERVER); + server_config.set_host("localhost"); + server_config.set_async_server_threads(1); + + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); + + GetReporter()->ReportQPS(*result); + GetReporter()->ReportLatency(*result); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::InitBenchmark(&argc, &argv, true); + + grpc::testing::RunGenericAsyncStreamingPingPong(); + return 0; +} diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index c7096391e63..a8d404590c4 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -62,6 +62,8 @@ DEFINE_int32(client_channels, 1, "Number of client channels"); DEFINE_int32(simple_req_size, -1, "Simple proto request payload size"); DEFINE_int32(simple_resp_size, -1, "Simple proto response payload size"); +DEFINE_int32(bbuf_req_size, -1, "Byte-buffer request payload size"); +DEFINE_int32(bbuf_resp_size, -1, "Byte-buffer response payload size"); DEFINE_string(client_type, "SYNC_CLIENT", "Client type"); DEFINE_int32(async_client_threads, 1, "Async client threads"); @@ -109,6 +111,13 @@ static void QpsDriver() { if (FLAGS_simple_req_size >= 0) { params->set_req_size(FLAGS_simple_req_size); } + } else if (FLAGS_bbuf_resp_size >= 0) { + auto params = + client_config.mutable_payload_config()->mutable_bytebuf_params(); + params->set_resp_size(FLAGS_bbuf_resp_size); + if (FLAGS_bbuf_req_size >= 0) { + params->set_req_size(FLAGS_bbuf_req_size); + } } else { // set a reasonable default: proto but no payload client_config.mutable_payload_config()->mutable_simple_params(); From af2e1cf080037d44a641c69f5b566675bb9876c2 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 11 Jan 2016 09:54:23 -0800 Subject: [PATCH 09/12] Fix bugs and include build files --- Makefile | 46 ++++++++++++++++++++++++ test/cpp/qps/client.h | 23 ++++++------ test/cpp/qps/qps_worker.cc | 4 ++- tools/run_tests/sources_and_headers.json | 17 +++++++++ tools/run_tests/tests.json | 17 +++++++++ 5 files changed, 94 insertions(+), 13 deletions(-) diff --git a/Makefile b/Makefile index 1c816d3a480..e6579a5b9d5 100644 --- a/Makefile +++ b/Makefile @@ -903,6 +903,7 @@ cxx_slice_test: $(BINDIR)/$(CONFIG)/cxx_slice_test cxx_string_ref_test: $(BINDIR)/$(CONFIG)/cxx_string_ref_test cxx_time_test: $(BINDIR)/$(CONFIG)/cxx_time_test end2end_test: $(BINDIR)/$(CONFIG)/end2end_test +generic_async_streaming_ping_pong_test: $(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test generic_end2end_test: $(BINDIR)/$(CONFIG)/generic_end2end_test grpc_cli: $(BINDIR)/$(CONFIG)/grpc_cli grpc_cpp_plugin: $(BINDIR)/$(CONFIG)/grpc_cpp_plugin @@ -1254,6 +1255,7 @@ buildtests_cxx: buildtests_zookeeper privatelibs_cxx \ $(BINDIR)/$(CONFIG)/cxx_string_ref_test \ $(BINDIR)/$(CONFIG)/cxx_time_test \ $(BINDIR)/$(CONFIG)/end2end_test \ + $(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test \ $(BINDIR)/$(CONFIG)/generic_end2end_test \ $(BINDIR)/$(CONFIG)/grpc_cli \ $(BINDIR)/$(CONFIG)/interop_client \ @@ -1551,6 +1553,8 @@ test_cxx: test_zookeeper buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/cxx_time_test || ( echo test cxx_time_test failed ; exit 1 ) $(E) "[RUN] Testing end2end_test" $(Q) $(BINDIR)/$(CONFIG)/end2end_test || ( echo test end2end_test failed ; exit 1 ) + $(E) "[RUN] Testing generic_async_streaming_ping_pong_test" + $(Q) $(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test || ( echo test generic_async_streaming_ping_pong_test failed ; exit 1 ) $(E) "[RUN] Testing generic_end2end_test" $(Q) $(BINDIR)/$(CONFIG)/generic_end2end_test || ( echo test generic_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing interop_test" @@ -10026,6 +10030,48 @@ endif endif +GENERIC_ASYNC_STREAMING_PING_PONG_TEST_SRC = \ + test/cpp/qps/generic_async_streaming_ping_pong_test.cc \ + +GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test: $(PROTOBUF_DEP) $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/qps/generic_async_streaming_ping_pong_test.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a +deps_generic_async_streaming_ping_pong_test: $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS:.o=.dep) +endif +endif + + GENERIC_END2END_TEST_SRC = \ test/cpp/end2end/generic_end2end_test.cc \ diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 1b5a3d4a078..e4659d1a287 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -112,18 +112,16 @@ class ClientRequestCreator { public: ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) { if (payload_config.has_bytebuf_params()) { - if (payload_config.bytebuf_params().req_size() > 0) { - std::unique_ptr buf( - new char[payload_config.bytebuf_params().req_size()]); - gpr_slice s = gpr_slice_from_copied_buffer( - buf.get(), payload_config.bytebuf_params().req_size()); - Slice slice(s, Slice::STEAL_REF); - *req = ByteBuffer(&slice, 1); - // std::unique_ptr bbuf(new ByteBuffer(&slice, 1)); - // req->MoveFrom(bbuf.get()); - } else { - GPR_ASSERT(false); // not appropriate for this specialization - } + std::unique_ptr buf( + new char[payload_config.bytebuf_params().req_size()]); + gpr_slice s = gpr_slice_from_copied_buffer( + buf.get(), payload_config.bytebuf_params().req_size()); + Slice slice(s, Slice::STEAL_REF); + *req = ByteBuffer(&slice, 1); + // std::unique_ptr bbuf(new ByteBuffer(&slice, 1)); + // req->MoveFrom(bbuf.get()); + } else { + GPR_ASSERT(false); // not appropriate for this specialization } } }; @@ -375,6 +373,7 @@ std::unique_ptr CreateSynchronousStreamingClient( const ClientConfig& args); std::unique_ptr CreateAsyncUnaryClient(const ClientConfig& args); std::unique_ptr CreateAsyncStreamingClient(const ClientConfig& args); +std::unique_ptr CreateGenericAsyncStreamingClient(const ClientConfig& args); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index dc59eab7ef6..ea5bb4cd3ca 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -69,7 +69,9 @@ static std::unique_ptr CreateClient(const ClientConfig& config) { case ClientType::ASYNC_CLIENT: return (config.rpc_type() == RpcType::UNARY) ? CreateAsyncUnaryClient(config) - : CreateAsyncStreamingClient(config); + : (config.payload_config().has_bytebuf_params() + ? CreateGenericAsyncStreamingClient(config) + : CreateAsyncStreamingClient(config)); default: abort(); } diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 75debf41bb4..6ead6cc36f2 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -1497,6 +1497,23 @@ "test/cpp/end2end/end2end_test.cc" ] }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc++", + "grpc++_test_util", + "grpc_test_util", + "qps" + ], + "headers": [], + "language": "c++", + "name": "generic_async_streaming_ping_pong_test", + "src": [ + "test/cpp/qps/generic_async_streaming_ping_pong_test.cc" + ] + }, { "deps": [ "gpr", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 0b5f847af18..7c8476d9310 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -1823,6 +1823,23 @@ "windows" ] }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "exclude_configs": [], + "flaky": false, + "language": "c++", + "name": "generic_async_streaming_ping_pong_test", + "platforms": [ + "linux", + "mac", + "posix" + ] + }, { "args": [], "ci_platforms": [ From 78268212bae4e745606108beeeff317af393e033 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 11 Jan 2016 09:55:02 -0800 Subject: [PATCH 10/12] clang-format --- test/cpp/qps/client.h | 3 ++- test/cpp/qps/qps_worker.cc | 16 ++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index e4659d1a287..4a915a7801a 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -373,7 +373,8 @@ std::unique_ptr CreateSynchronousStreamingClient( const ClientConfig& args); std::unique_ptr CreateAsyncUnaryClient(const ClientConfig& args); std::unique_ptr CreateAsyncStreamingClient(const ClientConfig& args); -std::unique_ptr CreateGenericAsyncStreamingClient(const ClientConfig& args); +std::unique_ptr CreateGenericAsyncStreamingClient( + const ClientConfig& args); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index ea5bb4cd3ca..765dea8d8d4 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -36,20 +36,20 @@ #include #include #include +#include #include #include #include -#include +#include +#include +#include +#include #include #include #include -#include #include -#include -#include -#include -#include +#include #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/client.h" @@ -70,8 +70,8 @@ static std::unique_ptr CreateClient(const ClientConfig& config) { return (config.rpc_type() == RpcType::UNARY) ? CreateAsyncUnaryClient(config) : (config.payload_config().has_bytebuf_params() - ? CreateGenericAsyncStreamingClient(config) - : CreateAsyncStreamingClient(config)); + ? CreateGenericAsyncStreamingClient(config) + : CreateAsyncStreamingClient(config)); default: abort(); } From b30bc2aad6438cf89c11569fac436c83a7133f4b Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 11 Jan 2016 10:11:50 -0800 Subject: [PATCH 11/12] Remove dead code and a thus unneeded change to C++ bbuf --- include/grpc++/support/byte_buffer.h | 3 --- src/cpp/util/byte_buffer.cc | 6 ------ test/cpp/qps/client.h | 2 -- 3 files changed, 11 deletions(-) diff --git a/include/grpc++/support/byte_buffer.h b/include/grpc++/support/byte_buffer.h index a089dbb4cb5..d3f9b7b1d1f 100644 --- a/include/grpc++/support/byte_buffer.h +++ b/include/grpc++/support/byte_buffer.h @@ -72,9 +72,6 @@ class ByteBuffer GRPC_FINAL { /// Buffer size in bytes. size_t Length() const; - /// Move contents from \a bbuf and clear \a bbuf - void MoveFrom(ByteBuffer* bbuf); - private: friend class SerializationTraits; diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc index 91e202023c5..5245739c89b 100644 --- a/src/cpp/util/byte_buffer.cc +++ b/src/cpp/util/byte_buffer.cc @@ -79,12 +79,6 @@ size_t ByteBuffer::Length() const { } } -void ByteBuffer::MoveFrom(ByteBuffer* bbuf) { - Clear(); // in case we already had something, but we shouldn't use this then - buffer_ = bbuf->buffer_; - bbuf->buffer_ = nullptr; -} - ByteBuffer::ByteBuffer(const ByteBuffer& buf): buffer_(grpc_byte_buffer_copy(buf.buffer_)) { } diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 4a915a7801a..cce36933f6c 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -118,8 +118,6 @@ class ClientRequestCreator { buf.get(), payload_config.bytebuf_params().req_size()); Slice slice(s, Slice::STEAL_REF); *req = ByteBuffer(&slice, 1); - // std::unique_ptr bbuf(new ByteBuffer(&slice, 1)); - // req->MoveFrom(bbuf.get()); } else { GPR_ASSERT(false); // not appropriate for this specialization } From 70a043855e2a01d8281b364b71cf35c5b89868da Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 13 Jan 2016 12:27:05 -0800 Subject: [PATCH 12/12] Regen projects --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 0905f7d705e..aa97775c0ce 100644 --- a/Makefile +++ b/Makefile @@ -9362,6 +9362,7 @@ endif endif $(OBJDIR)/$(CONFIG)/test/cpp/qps/generic_async_streaming_ping_pong_test.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + deps_generic_async_streaming_ping_pong_test: $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS:.o=.dep) ifneq ($(NO_SECURE),true)