|
|
|
@ -206,21 +206,28 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
|
void* got_tag; |
|
|
|
|
bool ok; |
|
|
|
|
|
|
|
|
|
if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { |
|
|
|
|
// Got a regular event, so process it
|
|
|
|
|
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); |
|
|
|
|
if (!ctx->RunNextState(ok, histogram)) { |
|
|
|
|
// The RPC and callback are done, so clone the ctx
|
|
|
|
|
// and kickstart the new one
|
|
|
|
|
auto clone = ctx->StartNewClone(); |
|
|
|
|
clone->Start(cli_cqs_[thread_idx].get()); |
|
|
|
|
// delete the old version
|
|
|
|
|
delete ctx; |
|
|
|
|
switch (cli_cqs_[thread_idx]->AsyncNext( |
|
|
|
|
&got_tag, &ok, |
|
|
|
|
std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { |
|
|
|
|
case CompletionQueue::SHUTDOWN: |
|
|
|
|
return false; |
|
|
|
|
case CompletionQueue::GOT_EVENT: { |
|
|
|
|
// Got a regular event, so process it
|
|
|
|
|
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); |
|
|
|
|
if (!ctx->RunNextState(ok, histogram)) { |
|
|
|
|
// The RPC and callback are done, so clone the ctx
|
|
|
|
|
// and kickstart the new one
|
|
|
|
|
auto clone = ctx->StartNewClone(); |
|
|
|
|
clone->Start(cli_cqs_[thread_idx].get()); |
|
|
|
|
// delete the old version
|
|
|
|
|
delete ctx; |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} else { // queue is shutting down
|
|
|
|
|
return false; |
|
|
|
|
case CompletionQueue::TIMEOUT: |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
GPR_UNREACHABLE_CODE(return false); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
protected: |
|
|
|
|