From b335256444349361cf406b14bc846611b7249056 Mon Sep 17 00:00:00 2001 From: yang-g Date: Tue, 4 Aug 2015 14:42:06 -0700 Subject: [PATCH 1/3] Add AsyncNotifyWhenDone --- include/grpc++/server_context.h | 6 +++ src/cpp/server/server_context.cc | 13 ++++- test/cpp/end2end/async_end2end_test.cc | 74 ++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 1 deletion(-) diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index cf2732b33d2..03d2f0d128d 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -125,6 +125,11 @@ class ServerContext { const struct census_context* census_context() const; + // Async only. Has to be called before the rpc starts. + // Returns the tag in completion queue when the rpc finishes. + // IsCancelled() can then be called to check whether the rpc was cancelled. + void AsyncNotifyWhenDone(void* tag) { async_notify_when_done_tag_ = tag; } + private: friend class ::grpc::testing::InteropContextInspector; friend class ::grpc::Server; @@ -165,6 +170,7 @@ class ServerContext { void set_call(grpc_call* call); CompletionOp* completion_op_; + void* async_notify_when_done_tag_; gpr_timespec deadline_; grpc_call* call_; diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index cf19556e7a0..0d09519b28d 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -57,9 +57,12 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { bool CheckCancelled(CompletionQueue* cq); + void set_tag(void* tag) { tag_ = tag; } + void Unref(); private: + void* tag_; grpc::mutex mu_; int refs_; bool finalized_; @@ -90,18 +93,24 @@ void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { grpc::unique_lock lock(mu_); finalized_ = true; + bool ret = false; + if (tag_) { + *tag = tag_; + ret = true; + } if (!*status) cancelled_ = 1; if (--refs_ == 0) { lock.unlock(); delete this; } - return false; + return ret; } // ServerContext body ServerContext::ServerContext() : completion_op_(nullptr), + async_notify_when_done_tag_(nullptr), call_(nullptr), cq_(nullptr), sent_initial_metadata_(false) {} @@ -109,6 +118,7 @@ ServerContext::ServerContext() ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata, size_t metadata_count) : completion_op_(nullptr), + async_notify_when_done_tag_(nullptr), deadline_(deadline), call_(nullptr), cq_(nullptr), @@ -133,6 +143,7 @@ ServerContext::~ServerContext() { void ServerContext::BeginCompletionOp(Call* call) { GPR_ASSERT(!completion_op_); completion_op_ = new CompletionOp(); + completion_op_->set_tag(async_notify_when_done_tag_); call->PerformOps(completion_op_); } diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index b95bdf6b9b4..9b53bdc9990 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -592,6 +592,80 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second); EXPECT_GE(server_trailing_metadata.size(), static_cast(2)); } + +// Server uses AsyncNotifyWhenDone API to check for cancellation +TEST_F(AsyncEnd2endTest, ServerCheckCancellation) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter response_writer(&srv_ctx); + + send_request.set_message("Hello"); + std::unique_ptr > response_reader( + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + + srv_ctx.AsyncNotifyWhenDone(tag(5)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); + + Verifier().Expect(2, true).Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + cli_ctx.TryCancel(); + Verifier().Expect(5, true).Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); + + response_reader->Finish(&recv_response, &recv_status, tag(4)); + Verifier().Expect(4, false).Verify(cq_.get()); + + EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code()); +} + +// Server uses AsyncNotifyWhenDone API to check for normal finish +TEST_F(AsyncEnd2endTest, ServerCheckDone) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter response_writer(&srv_ctx); + + send_request.set_message("Hello"); + std::unique_ptr > response_reader( + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + + srv_ctx.AsyncNotifyWhenDone(tag(5)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), + cq_.get(), tag(2)); + + Verifier().Expect(2, true).Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + response_writer.Finish(send_response, Status::OK, tag(3)); + Verifier().Expect(3, true).Verify(cq_.get()); + Verifier().Expect(5, true).Verify(cq_.get()); + EXPECT_FALSE(srv_ctx.IsCancelled()); + + response_reader->Finish(&recv_response, &recv_status, tag(4)); + Verifier().Expect(4, true).Verify(cq_.get()); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + } // namespace } // namespace testing } // namespace grpc From d45a26ed06dfeafa41f49d17fe42a2f637ad6742 Mon Sep 17 00:00:00 2001 From: yang-g Date: Tue, 4 Aug 2015 16:36:22 -0700 Subject: [PATCH 2/3] allow null tag --- include/grpc++/server_context.h | 6 +++++- src/cpp/server/server_context.cc | 16 ++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 03d2f0d128d..23273f43e67 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -128,7 +128,10 @@ class ServerContext { // Async only. Has to be called before the rpc starts. // Returns the tag in completion queue when the rpc finishes. // IsCancelled() can then be called to check whether the rpc was cancelled. - void AsyncNotifyWhenDone(void* tag) { async_notify_when_done_tag_ = tag; } + void AsyncNotifyWhenDone(void* tag) { + has_notify_when_done_tag_ = true; + async_notify_when_done_tag_ = tag; + } private: friend class ::grpc::testing::InteropContextInspector; @@ -170,6 +173,7 @@ class ServerContext { void set_call(grpc_call* call); CompletionOp* completion_op_; + bool has_notify_when_done_tag_; void* async_notify_when_done_tag_; gpr_timespec deadline_; diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 0d09519b28d..04373397f92 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -50,18 +50,22 @@ namespace grpc { class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { public: // initial refs: one in the server context, one in the cq - CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {} + CompletionOp() : has_tag_(false), tag_(nullptr), refs_(2), finalized_(false), cancelled_(0) {} void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; bool CheckCancelled(CompletionQueue* cq); - void set_tag(void* tag) { tag_ = tag; } + void set_tag(void* tag) { + has_tag_ = true; + tag_ = tag; + } void Unref(); private: + bool has_tag_; void* tag_; grpc::mutex mu_; int refs_; @@ -94,7 +98,7 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { grpc::unique_lock lock(mu_); finalized_ = true; bool ret = false; - if (tag_) { + if (has_tag_) { *tag = tag_; ret = true; } @@ -110,6 +114,7 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { ServerContext::ServerContext() : completion_op_(nullptr), + has_notify_when_done_tag_(false), async_notify_when_done_tag_(nullptr), call_(nullptr), cq_(nullptr), @@ -118,6 +123,7 @@ ServerContext::ServerContext() ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata, size_t metadata_count) : completion_op_(nullptr), + has_notify_when_done_tag_(false), async_notify_when_done_tag_(nullptr), deadline_(deadline), call_(nullptr), @@ -143,7 +149,9 @@ ServerContext::~ServerContext() { void ServerContext::BeginCompletionOp(Call* call) { GPR_ASSERT(!completion_op_); completion_op_ = new CompletionOp(); - completion_op_->set_tag(async_notify_when_done_tag_); + if (has_notify_when_done_tag_) { + completion_op_->set_tag(async_notify_when_done_tag_); + } call->PerformOps(completion_op_); } From b62a90602b06726c807799af2340917656c02d43 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 5 Aug 2015 09:02:10 -0700 Subject: [PATCH 3/3] regenerate projects --- Makefile | 1 - 1 file changed, 1 deletion(-) diff --git a/Makefile b/Makefile index d9ede7a6e65..c4c73db76c5 100644 --- a/Makefile +++ b/Makefile @@ -18739,7 +18739,6 @@ test/cpp/qps/timer.cc: $(OPENSSL_DEP) test/cpp/util/benchmark_config.cc: $(OPENSSL_DEP) test/cpp/util/cli_call.cc: $(OPENSSL_DEP) test/cpp/util/create_test_channel.cc: $(OPENSSL_DEP) -test/cpp/util/fake_credentials.cc: $(OPENSSL_DEP) test/cpp/util/subprocess.cc: $(OPENSSL_DEP) test/cpp/util/test_config.cc: $(OPENSSL_DEP) endif