|
|
|
@ -245,9 +245,20 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
|
if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
ClientRpcContext* ctx; |
|
|
|
|
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); |
|
|
|
|
std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; |
|
|
|
|
do { |
|
|
|
|
shutdown_mu->lock(); |
|
|
|
|
while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( |
|
|
|
|
[&, ctx, ok, entry_ptr, shutdown_mu]() { |
|
|
|
|
if (!ctx->RunNextState(ok, entry_ptr)) { |
|
|
|
|
// The RPC and callback are done, so clone the ctx
|
|
|
|
|
// and kickstart the new one
|
|
|
|
|
ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); |
|
|
|
|
delete ctx; |
|
|
|
|
} |
|
|
|
|
shutdown_mu->unlock(); |
|
|
|
|
}, |
|
|
|
|
&got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) { |
|
|
|
|
t->UpdateHistogram(entry_ptr); |
|
|
|
|
// Got a regular event, so process it
|
|
|
|
|
ctx = ClientRpcContext::detag(got_tag); |
|
|
|
@ -265,18 +276,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
|
shutdown_mu->unlock(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( |
|
|
|
|
[&, ctx, ok, entry_ptr, shutdown_mu]() { |
|
|
|
|
bool next_ok = ok; |
|
|
|
|
if (!ctx->RunNextState(next_ok, entry_ptr)) { |
|
|
|
|
// The RPC and callback are done, so clone the ctx
|
|
|
|
|
// and kickstart the new one
|
|
|
|
|
ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); |
|
|
|
|
delete ctx; |
|
|
|
|
} |
|
|
|
|
shutdown_mu->unlock(); |
|
|
|
|
}, |
|
|
|
|
&got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; |
|
|
|
|