From fd88dcaf5537839a30e4f958a01c311f3e876c26 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 1 Nov 2018 11:58:46 -0700 Subject: [PATCH] Add cancellation notification --- .../grpcpp/impl/codegen/interceptor_common.h | 99 +++++++++++++++++++ src/cpp/client/client_context.cc | 7 +- src/cpp/server/server_context.cc | 6 ++ 3 files changed, 109 insertions(+), 3 deletions(-) diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h index d47e28d615f..f520e7905a1 100644 --- a/include/grpcpp/impl/codegen/interceptor_common.h +++ b/include/grpcpp/impl/codegen/interceptor_common.h @@ -352,6 +352,105 @@ class InterceptorBatchMethodsImpl MetadataMap* recv_trailing_metadata_ = nullptr; }; +// A special implementation of InterceptorBatchMethods to send a Cancel +// notification down the interceptor stack +class CancelInterceptorBatchMethods + : public experimental::InterceptorBatchMethods { + public: + bool QueryInterceptionHookPoint( + experimental::InterceptionHookPoints type) override { + if (type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL) { + return true; + } else { + return false; + } + } + + void Proceed() override { + // This is a no-op. For actual continuation of the RPC simply needs to + // return from the Intercept method + } + + void Hijack() override { + // Only the client can hijack when sending down initial metadata + GPR_CODEGEN_ASSERT(false && + "It is illegal to call Hijack on a method which has a " + "Cancel notification"); + } + + ByteBuffer* GetSendMessage() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendMessage on a method which " + "has a Cancel notification"); + return nullptr; + } + + std::multimap* GetSendInitialMetadata() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendInitialMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + Status GetSendStatus() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendStatus on a method which " + "has a Cancel notification"); + return Status(); + } + + void ModifySendStatus(const Status& status) override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call ModifySendStatus on a method " + "which has a Cancel notification"); + return; + } + + std::multimap* GetSendTrailingMetadata() + override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendTrailingMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + void* GetRecvMessage() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvMessage on a method which " + "has a Cancel notification"); + return nullptr; + } + + std::multimap* GetRecvInitialMetadata() + override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvInitialMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + Status* GetRecvStatus() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvStatus on a method which " + "has a Cancel notification"); + return nullptr; + } + + std::multimap* GetRecvTrailingMetadata() + override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvTrailingMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + std::unique_ptr GetInterceptedChannel() { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetInterceptedChannel on a " + "method which has a Cancel notification"); + return std::unique_ptr(nullptr); + } +}; } // namespace internal } // namespace grpc diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 8cdcddca87e..d5eb029a24e 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -111,9 +111,10 @@ void ClientContext::set_compression_algorithm( void ClientContext::TryCancel() { std::unique_lock lock(mu_); if (call_) { - // for(size_t i = 0; i < rpc_info_.interceptors_.size(); i++) { - // rpc_info_.RunInterceptor(, 0); - //} + internal::CancelInterceptorBatchMethods cancel_methods; + for (size_t i = 0; i < rpc_info_.interceptors_.size(); i++) { + rpc_info_.RunInterceptor(&cancel_methods, i); + } grpc_call_cancel(call_, nullptr); } else { call_canceled_ = true; diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 995e7877854..e61fec99665 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -275,6 +275,12 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key, } void ServerContext::TryCancel() const { + internal::CancelInterceptorBatchMethods cancel_methods; + if (rpc_info_) { + for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) { + rpc_info_->RunInterceptor(&cancel_methods, i); + } + } grpc_call_error err = grpc_call_cancel_with_status( call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr); if (err != GRPC_CALL_OK) {