Merge pull request #1706 from ctiller/streaming-crash-boom-bang

Fix shutdown crash in async streaming test
pull/1598/merge
Nicolas Noble 10 years ago
commit 5fa2eab4f4
  1. 15
      test/cpp/qps/server_async.cc

@ -99,12 +99,15 @@ class AsyncQpsServerTest : public Server {
while (srv_cq_->Next(&got_tag, &ok)) { while (srv_cq_->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag); ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
if (ctx->RunNextState(ok) == false) { bool still_going = ctx->RunNextState(ok);
std::lock_guard<std::mutex> g(shutdown_mutex_);
if (!shutdown_) {
// this RPC context is done, so refresh it // this RPC context is done, so refresh it
std::lock_guard<std::mutex> g(shutdown_mutex_); if (!still_going) {
if (!shutdown_) {
ctx->Reset(); ctx->Reset();
} }
} else {
return;
} }
} }
return; return;
@ -116,11 +119,15 @@ class AsyncQpsServerTest : public Server {
{ {
std::lock_guard<std::mutex> g(shutdown_mutex_); std::lock_guard<std::mutex> g(shutdown_mutex_);
shutdown_ = true; shutdown_ = true;
srv_cq_->Shutdown();
} }
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join(); thr->join();
} }
srv_cq_->Shutdown();
bool ok;
void *got_tag;
while (srv_cq_->Next(&got_tag, &ok))
;
while (!contexts_.empty()) { while (!contexts_.empty()) {
delete contexts_.front(); delete contexts_.front();
contexts_.pop_front(); contexts_.pop_front();

Loading…
Cancel
Save