diff --git a/include/grpc++/generic/async_generic_service.h b/include/grpc++/generic/async_generic_service.h index 8578d850ffd..f209c077e6c 100644 --- a/include/grpc++/generic/async_generic_service.h +++ b/include/grpc++/generic/async_generic_service.h @@ -61,6 +61,7 @@ class AsyncGenericService GRPC_FINAL { // TODO(yangg) Once we can add multiple completion queues to the server // in c core, add a CompletionQueue* argument to the ctor here. // TODO(yangg) support methods list. + AsyncGenericService() : server_(nullptr) {} AsyncGenericService(const grpc::string& methods) : server_(nullptr) {} void RequestCall(GenericServerContext* ctx, diff --git a/include/grpc++/support/byte_buffer.h b/include/grpc++/support/byte_buffer.h index 3adf86c4794..a089dbb4cb5 100644 --- a/include/grpc++/support/byte_buffer.h +++ b/include/grpc++/support/byte_buffer.h @@ -61,6 +61,8 @@ class ByteBuffer GRPC_FINAL { ~ByteBuffer(); + ByteBuffer& operator=(const ByteBuffer&); + /// Dump (read) the buffer contents into \a slices. void Dump(std::vector* slices) const; @@ -76,8 +78,6 @@ class ByteBuffer GRPC_FINAL { private: friend class SerializationTraits; - ByteBuffer& operator=(const ByteBuffer&); - // takes ownership void set_buffer(grpc_byte_buffer* buf) { if (buffer_) { diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc index 467f0007de7..91e202023c5 100644 --- a/src/cpp/util/byte_buffer.cc +++ b/src/cpp/util/byte_buffer.cc @@ -89,4 +89,10 @@ ByteBuffer::ByteBuffer(const ByteBuffer& buf): buffer_(grpc_byte_buffer_copy(buf.buffer_)) { } +ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) { + Clear(); // first remove existing data + buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy + return *this; +} + } // namespace grpc diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c151918ce4c..85a47ff71d8 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -55,9 +56,15 @@ namespace grpc { namespace testing { +template class AsyncQpsServerTest : public Server { public: - explicit AsyncQpsServerTest(const ServerConfig &config) : Server(config) { + AsyncQpsServerTest(const ServerConfig &config, + std::function register_service, + std::function*, CompletionQueue *, ServerCompletionQueue *, void *)> request_unary_function, + std::function*, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, + std::function process_rpc) + : Server(config) { char *server_address = NULL; gpr_join_host_port(&server_address, config.host().c_str(), port()); @@ -67,7 +74,8 @@ class AsyncQpsServerTest : public Server { Server::CreateServerCredentials(config)); gpr_free(server_address); - builder.RegisterAsyncService(&async_service_); + register_service(&builder, &async_service_); + for (int i = 0; i < config.async_server_threads(); i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } @@ -75,22 +83,27 @@ class AsyncQpsServerTest : public Server { server_ = builder.BuildAndStart(); using namespace std::placeholders; + + auto process_rpc_bound = std::bind(process_rpc, config, _1, _2); + for (int i = 0; i < 10000 / config.async_server_threads(); i++) { for (int j = 0; j < config.async_server_threads(); j++) { - auto request_unary = std::bind( - &BenchmarkService::AsyncService::RequestUnaryCall, &async_service_, - _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); - auto request_streaming = std::bind( - &BenchmarkService::AsyncService::RequestStreamingCall, - &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); - contexts_.push_front( - new ServerRpcContextUnaryImpl( - request_unary, ProcessRPC)); - contexts_.push_front( - new ServerRpcContextStreamingImpl( - request_streaming, ProcessRPC)); + if (request_unary_function) { + auto request_unary = std::bind( + request_unary_function, &async_service_, + _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + contexts_.push_front(new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); + } + if (request_streaming_function) { + auto request_streaming = std::bind( + request_streaming_function, + &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + contexts_.push_front(new ServerRpcContextStreamingImpl( + request_streaming, process_rpc_bound)); + } } } + for (int i = 0; i < config.async_server_threads(); i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); } @@ -155,16 +168,15 @@ class AsyncQpsServerTest : public Server { return reinterpret_cast(tag); } - template class ServerRpcContextUnaryImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextUnaryImpl( - std::function *, void *)> request_method, std::function invoke_method) - : srv_ctx_(new ServerContext), + : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextUnaryImpl::invoker), request_method_(request_method), invoke_method_(invoke_method), @@ -177,7 +189,7 @@ class AsyncQpsServerTest : public Server { return (this->*next_state_)(ok); } void Reset() GRPC_OVERRIDE { - srv_ctx_.reset(new ServerContext); + srv_ctx_.reset(new ServerContextType); req_ = RequestType(); response_writer_ = grpc::ServerAsyncResponseWriter(srv_ctx_.get()); @@ -205,10 +217,10 @@ class AsyncQpsServerTest : public Server { response_writer_.Finish(response, status, AsyncQpsServerTest::tag(this)); return true; } - std::unique_ptr srv_ctx_; + std::unique_ptr srv_ctx_; RequestType req_; bool (ServerRpcContextUnaryImpl::*next_state_)(bool); - std::function *, void *)> request_method_; std::function @@ -216,16 +228,15 @@ class AsyncQpsServerTest : public Server { grpc::ServerAsyncResponseWriter response_writer_; }; - template class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextStreamingImpl( - std::function *, void *)> request_method, std::function invoke_method) - : srv_ctx_(new ServerContext), + : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingImpl::request_done), request_method_(request_method), invoke_method_(invoke_method), @@ -237,7 +248,7 @@ class AsyncQpsServerTest : public Server { return (this->*next_state_)(ok); } void Reset() GRPC_OVERRIDE { - srv_ctx_.reset(new ServerContext); + srv_ctx_.reset(new ServerContextType); req_ = RequestType(); stream_ = grpc::ServerAsyncReaderWriter( srv_ctx_.get()); @@ -286,11 +297,11 @@ class AsyncQpsServerTest : public Server { } bool finish_done(bool ok) { return false; /* reset the context */ } - std::unique_ptr srv_ctx_; + std::unique_ptr srv_ctx_; RequestType req_; bool (ServerRpcContextStreamingImpl::*next_state_)(bool); std::function *, void *)> request_method_; std::function @@ -298,20 +309,10 @@ class AsyncQpsServerTest : public Server { grpc::ServerAsyncReaderWriter stream_; }; - static Status ProcessRPC(const SimpleRequest *request, - SimpleResponse *response) { - if (request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); - } - } - return Status::OK; - } std::vector threads_; std::unique_ptr server_; std::vector> srv_cqs_; - BenchmarkService::AsyncService async_service_; + ServiceType async_service_; std::forward_list contexts_; class PerThreadShutdownState { @@ -335,8 +336,39 @@ class AsyncQpsServerTest : public Server { std::vector> shutdown_state_; }; +static void RegisterBenchmarkService(ServerBuilder *builder, + BenchmarkService::AsyncService *service) { + builder->RegisterAsyncService(service); +} +static void RegisterGenericService(ServerBuilder *builder, + grpc::AsyncGenericService *service) { + builder->RegisterAsyncGenericService(service); +} + +template +Status ProcessRPC(const ServerConfig &config, const RequestType *request, + ResponseType *response) { + if (request->response_size() > 0) { + if (!Server::SetPayload(request->response_type(), request->response_size(), + response->mutable_payload())) { + return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + } + } + return Status::OK; +} + +template<> +Status ProcessRPC(const ServerConfig &config, const ByteBuffer *request, + ByteBuffer *response) { + return Status::OK; +} + + std::unique_ptr CreateAsyncServer(const ServerConfig &config) { - return std::unique_ptr(new AsyncQpsServerTest(config)); + return std::unique_ptr(new AsyncQpsServerTest(config, RegisterBenchmarkService, &BenchmarkService::AsyncService::RequestUnaryCall, &BenchmarkService::AsyncService::RequestStreamingCall, ProcessRPC)); +} +std::unique_ptr CreateAsyncGenericServer(const ServerConfig &config) { + return std::unique_ptr(new AsyncQpsServerTest(config, RegisterGenericService, nullptr, &grpc::AsyncGenericService::RequestCall, ProcessRPC)); } } // namespace testing