diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 8273ef2f4a1..29deef658f0 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -141,6 +141,9 @@ class CallbackWithSuccessTag // that are detected before the operations are internally processed. void force_run(bool ok) { Run(ok); } + /// check if this tag has ever been set + operator bool() const { return call_ != nullptr; } + private: grpc_call* call_; std::function func_; @@ -153,13 +156,19 @@ class CallbackWithSuccessTag void Run(bool ok) { void* ignored = ops_; bool new_ok = ok; - GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); + // Allow a "false" return value from FinalizeResult to silence the + // callback, just as it silences a CQ tag in the async cases + bool do_callback = ops_->FinalizeResult(&ignored, &new_ok); GPR_CODEGEN_ASSERT(ignored == ops_); - // Last use of func_, so ok to move it out for rvalue call above - auto func = std::move(func_); - func_ = nullptr; // reset to clear this out for sure - CatchingCallback(std::move(func), ok); + if (do_callback) { + // Last use of func_, so ok to move it out for rvalue call above + auto func = std::move(func_); + func_ = nullptr; // reset to clear this out for sure + CatchingCallback(std::move(func), ok); + } else { + func_ = nullptr; // reset to clear this out for sure + } g_core_codegen_interface->grpc_call_unref(call_); } }; diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index ecb9073cf92..82ee862f61b 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -139,7 +140,7 @@ class ServerContext { /// must end in "-bin". void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); - /// IsCancelled is always safe to call when using sync API. + /// IsCancelled is always safe to call when using sync or callback API. /// When using async API, it is only safe to call IsCancelled after /// the AsyncNotifyWhenDone tag has been delivered. bool IsCancelled() const; @@ -281,7 +282,7 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(internal::Call* call); + void BeginCompletionOp(internal::Call* call, bool callback); /// Return the tag queued by BeginCompletionOp() internal::CompletionQueueTag* GetCompletionOpTag(); @@ -312,6 +313,7 @@ class ServerContext { CompletionOp* completion_op_; bool has_notify_when_done_tag_; void* async_notify_when_done_tag_; + internal::CallbackWithSuccessTag completion_tag_; gpr_timespec deadline_; grpc_call* call_; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 4ce2a46a09d..a9f0315af63 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -279,7 +279,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void ContinueRunAfterInterception() { { - ctx_.BeginCompletionOp(&call_); + ctx_.BeginCompletionOp(&call_, false); global_callbacks_->PreSynchronousRequest(&ctx_); auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); @@ -444,7 +444,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { } } void ContinueRunAfterInterception() { - // req_->ctx_.BeginCompletionOp(call_); + req_->ctx_.BeginCompletionOp(call_, true); req_->method_->handler()->RunHandler( internal::MethodHandler::HandlerParameter( call_, &req_->ctx_, req_->request_, req_->request_status_, @@ -994,7 +994,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } } if (*status && call_) { - context_->BeginCompletionOp(&call_wrapper_); + context_->BeginCompletionOp(&call_wrapper_, false); } *tag = tag_; if (delete_on_finalize_) { @@ -1005,7 +1005,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, void ServerInterface::BaseAsyncRequest:: ContinueFinalizeResultAfterInterception() { - context_->BeginCompletionOp(&call_wrapper_); + context_->BeginCompletionOp(&call_wrapper_, false); // Queue a tag which will be returned immediately grpc_core::ExecCtx exec_ctx; grpc_cq_begin_op(notification_cq_->cq(), this); diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 51a2689c6a5..8ded7a8972f 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -45,11 +45,18 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { : call_(*call), has_tag_(false), tag_(nullptr), + cq_tag_(this), refs_(2), finalized_(false), cancelled_(0), done_intercepting_(false) {} + // CompletionOp isn't copyable or movable + CompletionOp(const CompletionOp&) = delete; + CompletionOp& operator=(const CompletionOp&) = delete; + CompletionOp(CompletionOp&&) = delete; + CompletionOp& operator=(CompletionOp&&) = delete; + ~CompletionOp() { if (call_.server_rpc_info()) { call_.server_rpc_info()->Unref(); @@ -85,8 +92,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { tag_ = tag; } - /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API - void* cq_tag() override { return this; } + void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + + void* cq_tag() override { return cq_tag_; } void Unref(); @@ -130,6 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { internal::Call call_; bool has_tag_; void* tag_; + void* cq_tag_; std::mutex mu_; int refs_; bool finalized_; @@ -158,7 +167,7 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) { interceptor_methods_.SetReverse(); interceptor_methods_.SetCallOpSetInterface(this); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), &ops, 1, this, nullptr)); + grpc_call_start_batch(call->call(), &ops, 1, cq_tag_, nullptr)); /* No interceptors to run here */ } @@ -251,7 +260,7 @@ void ServerContext::Clear() { // either called from destructor or just before Setup } -void ServerContext::BeginCompletionOp(internal::Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { GPR_ASSERT(!completion_op_); if (rpc_info_) { rpc_info_->Ref(); @@ -260,7 +269,11 @@ void ServerContext::BeginCompletionOp(internal::Call* call) { completion_op_ = new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) CompletionOp(call); - if (has_notify_when_done_tag_) { + if (callback) { + completion_tag_ = + internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_); + completion_op_->set_cq_tag(&completion_tag_); + } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } call->PerformOps(completion_op_); @@ -289,12 +302,15 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - if (has_notify_when_done_tag_) { - // when using async API, but the result is only valid + if (completion_tag_) { + // When using callback API, this result is always valid. + return completion_op_->CheckCancelledAsync(); + } else if (completion_tag_ || has_notify_when_done_tag_) { + // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); } else { - // when using sync API + // when using sync API, the result is always valid return completion_op_ && completion_op_->CheckCancelled(cq_); } }