More changes needed for generic support

pull/4670/head
Vijay Pai 9 years ago
parent 93beeb8895
commit 9f991e252d
  1. 5
      test/cpp/qps/client.h
  2. 18
      test/cpp/qps/client_async.cc
  3. 27
      test/cpp/qps/server_async.cc

@ -118,8 +118,9 @@ class ClientRequestCreator<ByteBuffer> {
gpr_slice s = gpr_slice_from_copied_buffer(
buf.get(), payload_config.bytebuf_params().req_size());
Slice slice(s, Slice::STEAL_REF);
std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1));
req->MoveFrom(bbuf.get());
*req = ByteBuffer(&slice, 1);
// std::unique_ptr<ByteBuffer> bbuf(new ByteBuffer(&slice, 1));
// req->MoveFrom(bbuf.get());
} else {
GPR_ASSERT(false); // not appropriate for this specialization
}

@ -488,9 +488,9 @@ class AsyncStreamingClient GRPC_FINAL
}
};
class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
public:
ClientGenericRpcContextStreamingImpl(
ClientRpcContextGenericStreamingImpl(
int channel_id, grpc::GenericStub* stub, const ByteBuffer& req,
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*,
@ -502,16 +502,16 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
stub_(stub),
req_(req),
response_(),
next_state_(&ClientGenericRpcContextStreamingImpl::ReqSent),
next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent),
callback_(on_done),
start_req_(start_req),
start_(Timer::Now()) {}
~ClientGenericRpcContextStreamingImpl() GRPC_OVERRIDE {}
~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
return (this->*next_state_)(ok, hist);
}
ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
return new ClientGenericRpcContextStreamingImpl(channel_id_, stub_, req_,
return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_,
start_req_, callback_);
}
void Start(CompletionQueue* cq) GRPC_OVERRIDE {
@ -528,7 +528,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
return (false);
}
start_ = Timer::Now();
next_state_ = &ClientGenericRpcContextStreamingImpl::WriteDone;
next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone;
stream_->Write(req_, ClientRpcContext::tag(this));
return true;
}
@ -536,7 +536,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
if (!ok) {
return (false);
}
next_state_ = &ClientGenericRpcContextStreamingImpl::ReadDone;
next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone;
stream_->Read(&response_, ClientRpcContext::tag(this));
return true;
}
@ -548,7 +548,7 @@ class ClientGenericRpcContextStreamingImpl : public ClientRpcContext {
grpc::GenericStub* stub_;
ByteBuffer req_;
ByteBuffer response_;
bool (ClientGenericRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*);
std::function<void(grpc::Status, ByteBuffer*)> callback_;
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
@ -587,7 +587,7 @@ class GenericAsyncStreamingClient GRPC_FINAL
};
static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub,
const ByteBuffer& req) {
return new ClientGenericRpcContextStreamingImpl(
return new ClientRpcContextGenericStreamingImpl(
channel_id, stub, req, GenericAsyncStreamingClient::StartReq,
GenericAsyncStreamingClient::CheckDone);
}

@ -71,7 +71,7 @@ class AsyncQpsServerTest : public Server {
ServerAsyncReaderWriter<ResponseType, RequestType> *,
CompletionQueue *, ServerCompletionQueue *, void *)>
request_streaming_function,
std::function<grpc::Status(const ServerConfig &, const RequestType *,
std::function<grpc::Status(const PayloadConfig &, const RequestType *,
ResponseType *)>
process_rpc)
: Server(config) {
@ -94,7 +94,8 @@ class AsyncQpsServerTest : public Server {
using namespace std::placeholders;
auto process_rpc_bound = std::bind(process_rpc, config, _1, _2);
auto process_rpc_bound = std::bind(process_rpc, config.payload_config(),
_1, _2);
for (int i = 0; i < 10000 / config.async_server_threads(); i++) {
for (int j = 0; j < config.async_server_threads(); j++) {
@ -358,9 +359,10 @@ static void RegisterGenericService(ServerBuilder *builder,
builder->RegisterAsyncGenericService(service);
}
template <class RequestType, class ResponseType>
Status ProcessRPC(const ServerConfig &config, const RequestType *request,
ResponseType *response) {
static Status ProcessSimpleRPC(const PayloadConfig&,
const SimpleRequest *request,
SimpleResponse *response) {
if (request->response_size() > 0) {
if (!Server::SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
@ -370,9 +372,14 @@ Status ProcessRPC(const ServerConfig &config, const RequestType *request,
return Status::OK;
}
template <>
Status ProcessRPC(const ServerConfig &config, const ByteBuffer *request,
ByteBuffer *response) {
static Status ProcessGenericRPC(const PayloadConfig& payload_config,
const ByteBuffer *request,
ByteBuffer *response) {
int resp_size = payload_config.bytebuf_params().resp_size();
std::unique_ptr<char> buf(new char[resp_size]);
gpr_slice s = gpr_slice_from_copied_buffer(buf.get(), resp_size);
Slice slice(s, Slice::STEAL_REF);
*response = ByteBuffer(&slice, 1);
return Status::OK;
}
@ -384,7 +391,7 @@ std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) {
config, RegisterBenchmarkService,
&BenchmarkService::AsyncService::RequestUnaryCall,
&BenchmarkService::AsyncService::RequestStreamingCall,
ProcessRPC<SimpleRequest, SimpleResponse>));
ProcessSimpleRPC));
}
std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) {
return std::unique_ptr<Server>(
@ -392,7 +399,7 @@ std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) {
grpc::GenericServerContext>(
config, RegisterGenericService, nullptr,
&grpc::AsyncGenericService::RequestCall,
ProcessRPC<ByteBuffer, ByteBuffer>));
ProcessGenericRPC));
}
} // namespace testing

Loading…
Cancel
Save