Update code according to comment

pull/3109/head
Yang Gao 10 years ago
parent be8ac56e93
commit deba4a99fe
  1. 81
      cpp/helloworld/greeter_async_server.cc

@ -57,16 +57,13 @@ using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;
static bool got_sigint = false;
class ServerImpl final {
public:
ServerImpl() : service_(&service_cq_) {}
ServerImpl() : service_(&cq_) {}
~ServerImpl() {
server_->Shutdown();
rpc_cq_.Shutdown();
service_cq_.Shutdown();
cq_.Shutdown();
}
// There is no shutdown handling in this code.
@ -79,46 +76,56 @@ class ServerImpl final {
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
while (true) {
CallData* rpc = new CallData();
service_.RequestSayHello(&rpc->ctx, &rpc->request, &rpc->responder,
&rpc_cq_, rpc);
void* got_tag;
bool ok;
service_cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
GPR_ASSERT(got_tag == rpc);
std::thread t(&ServerImpl::HandleRpc, this, rpc);
t.detach();
}
HandleRpcs();
}
private:
struct CallData {
CallData() : responder(&ctx) {}
ServerContext ctx;
HelloRequest request;
HelloReply reply;
ServerAsyncResponseWriter<HelloReply> responder;
class CallData {
public:
CallData(Greeter::AsyncService* service, CompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, this);
status_ = PROCESS;
} else if (status_ == PROCESS) {
new CallData(service_, cq_);
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
responder_.Finish(reply_, Status::OK, this);
status_ = FINISH;
} else {
delete this;
}
}
private:
Greeter::AsyncService* service_;
CompletionQueue* cq_;
ServerContext ctx_;
HelloRequest request_;
HelloReply reply_;
ServerAsyncResponseWriter<HelloReply> responder_;
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;
};
// Runs in a detached thread, processes rpc then deletes data.
void HandleRpc(CallData* rpc) {
std::string prefix("Hello ");
rpc->reply.set_message(prefix + rpc->request.name());
rpc->responder.Finish(rpc->reply, Status::OK, &rpc->ctx);
void* got_tag;
// This can be run in multiple threads if needed.
void HandleRpcs() {
new CallData(&service_, &cq_);
void* tag;
bool ok;
rpc_cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok);
GPR_ASSERT(got_tag == &rpc->ctx);
delete rpc;
while (true) {
cq_.Next(&tag, &ok);
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
CompletionQueue service_cq_;
CompletionQueue rpc_cq_;
CompletionQueue cq_;
Greeter::AsyncService service_;
std::unique_ptr<Server> server_;
};

Loading…
Cancel
Save