|
|
|
@ -236,6 +236,22 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
|
this->EndThreads(); // this needed for resolution
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ClientRpcContext* ProcessTag(size_t thread_idx, void* tag) { |
|
|
|
|
ClientRpcContext* ctx = ClientRpcContext::detag(tag); |
|
|
|
|
if (shutdown_state_[thread_idx]->shutdown) { |
|
|
|
|
ctx->TryCancel(); |
|
|
|
|
delete ctx; |
|
|
|
|
bool ok; |
|
|
|
|
while (cli_cqs_[cq_[thread_idx]]->Next(&tag, &ok)) { |
|
|
|
|
ctx = ClientRpcContext::detag(tag); |
|
|
|
|
ctx->TryCancel(); |
|
|
|
|
delete ctx; |
|
|
|
|
} |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
return ctx; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ThreadFunc(size_t thread_idx, Client::Thread* t) override final { |
|
|
|
|
void* got_tag; |
|
|
|
|
bool ok; |
|
|
|
@ -245,9 +261,13 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
|
if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); |
|
|
|
|
std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; |
|
|
|
|
shutdown_mu->lock(); |
|
|
|
|
ClientRpcContext* ctx = ProcessTag(thread_idx, got_tag); |
|
|
|
|
if (ctx == nullptr) { |
|
|
|
|
shutdown_mu->unlock(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( |
|
|
|
|
[&, ctx, ok, entry_ptr, shutdown_mu]() { |
|
|
|
|
if (!ctx->RunNextState(ok, entry_ptr)) { |
|
|
|
@ -260,19 +280,9 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { |
|
|
|
|
}, |
|
|
|
|
&got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) { |
|
|
|
|
t->UpdateHistogram(entry_ptr); |
|
|
|
|
// Got a regular event, so process it
|
|
|
|
|
ctx = ClientRpcContext::detag(got_tag); |
|
|
|
|
// Proceed while holding a lock to make sure that
|
|
|
|
|
// this thread isn't supposed to shut down
|
|
|
|
|
shutdown_mu->lock(); |
|
|
|
|
if (shutdown_state_[thread_idx]->shutdown) { |
|
|
|
|
ctx->TryCancel(); |
|
|
|
|
delete ctx; |
|
|
|
|
while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { |
|
|
|
|
ctx = ClientRpcContext::detag(got_tag); |
|
|
|
|
ctx->TryCancel(); |
|
|
|
|
delete ctx; |
|
|
|
|
} |
|
|
|
|
ctx = ProcessTag(thread_idx, got_tag); |
|
|
|
|
if (ctx == nullptr) { |
|
|
|
|
shutdown_mu->unlock(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|