Merge pull request #920 from vjpai/async-probs

Solving crashes in async tests
pull/939/head
Craig Tiller 10 years ago
commit c82c36b400
  1. 5
      src/cpp/server/server.cc
  2. 20
      test/cpp/qps/server_async.cc

@ -324,6 +324,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
*tag = tag_; *tag = tag_;
bool orig_status = *status;
if (*status && request_) { if (*status && request_) {
if (payload_) { if (payload_) {
*status = DeserializeProto(payload_, request_); *status = DeserializeProto(payload_, request_);
@ -343,7 +344,9 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
} }
ctx_->call_ = call_; ctx_->call_ = call_;
Call call(call_, server_, cq_); Call call(call_, server_, cq_);
ctx_->BeginCompletionOp(&call); if (orig_status && call_) {
ctx_->BeginCompletionOp(&call);
}
// just the pointers inside call are copied here // just the pointers inside call are copied here
stream_->BindCall(&call); stream_->BindCall(&call);
delete this; delete this;

@ -143,21 +143,24 @@ class AsyncQpsServerTest {
delete contexts_.front(); delete contexts_.front();
contexts_.pop_front(); contexts_.pop_front();
} }
for (auto& thr: threads_) {
thr.join();
}
} }
void ServeRpcs(int num_threads) { void ServeRpcs(int num_threads) {
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
threads.push_back(std::thread([=]() { threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down // Wait until work is available or we are shutting down
bool ok; bool ok;
void *got_tag; void *got_tag;
while (srv_cq_.Next(&got_tag, &ok)) { while (srv_cq_.Next(&got_tag, &ok)) {
EXPECT_EQ(ok, true); if (ok) {
ServerRpcContext *ctx = detag(got_tag); ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
if (ctx->RunNextState() == false) { if (ctx->RunNextState() == false) {
// this RPC context is done, so refresh it // this RPC context is done, so refresh it
ctx->Reset(); ctx->Reset();
}
} }
} }
return; return;
@ -260,6 +263,7 @@ class AsyncQpsServerTest {
} }
CompletionQueue srv_cq_; CompletionQueue srv_cq_;
TestService::AsyncService async_service_; TestService::AsyncService async_service_;
std::vector<std::thread> threads_;
std::unique_ptr<Server> server_; std::unique_ptr<Server> server_;
std::function<void(ServerContext *, SimpleRequest *, std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)> grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>

Loading…
Cancel
Save