diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index b2100c68b7f..310bea93ca7 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -303,6 +303,18 @@ class CallOpSendMessage { template Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; + /// Send \a message using \a options for the write. The \a options are cleared + /// after use. This form of SendMessage allows gRPC to reference \a message + /// beyond the lifetime of SendMessage. + template + Status SendMessagePtr(const M* message, + WriteOptions options) GRPC_MUST_USE_RESULT; + + /// This form of SendMessage allows gRPC to reference \a message beyond the + /// lifetime of SendMessage. + template + Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT; + protected: void AddOp(grpc_op* ops, size_t* nops) { if (!send_buf_.Valid() || hijacked_) return; @@ -321,14 +333,14 @@ class CallOpSendMessage { if (!send_buf_.Valid()) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE); - interceptor_methods->SetSendMessage(&send_buf_); + interceptor_methods->SetSendMessage(&send_buf_, msg_); } void SetFinishInterceptionHookPoint( InterceptorBatchMethodsImpl* interceptor_methods) { // The contents of the SendMessage value that was previously set // has had its references stolen by core's operations - interceptor_methods->SetSendMessage(nullptr); + interceptor_methods->SetSendMessage(nullptr, nullptr); } void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { @@ -336,6 +348,7 @@ class CallOpSendMessage { } private: + const void* msg_ = nullptr; // The original non-serialized message bool hijacked_ = false; ByteBuffer send_buf_; WriteOptions write_options_; @@ -362,6 +375,19 @@ Status CallOpSendMessage::SendMessage(const M& message) { return SendMessage(message, WriteOptions()); } +template +Status CallOpSendMessage::SendMessagePtr(const M* message, + WriteOptions options) { + msg_ = message; + return SendMessage(*message, options); +} + +template +Status CallOpSendMessage::SendMessagePtr(const M* message) { + msg_ = message; + return SendMessage(*message, WriteOptions()); +} + template class CallOpRecvMessage { public: diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index 66cf9b7754c..c20e8458106 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -73,7 +73,7 @@ class CallbackUnaryCallImpl { CallbackWithStatusTag(call.call(), on_completion, ops); // TODO(vjpai): Unify code with sync API as much as possible - Status s = ops->SendMessage(*request); + Status s = ops->SendMessagePtr(request); if (!s.ok()) { tag->force_run(s); return; @@ -341,7 +341,7 @@ class ClientCallbackReaderWriterImpl start_corked_ = false; } // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok()); + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg).ok()); if (options.is_last_message()) { options.set_buffer_hint(); @@ -524,7 +524,7 @@ class ClientCallbackReaderImpl : context_(context), call_(call), reactor_(reactor) { this->BindReactor(reactor); // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(start_ops_.SendMessage(*request).ok()); + GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok()); start_ops_.ClientSendClose(); } @@ -650,7 +650,7 @@ class ClientCallbackWriterImpl start_corked_ = false; } // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*msg).ok()); + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg).ok()); if (options.is_last_message()) { options.set_buffer_hint(); diff --git a/include/grpcpp/impl/codegen/client_unary_call.h b/include/grpcpp/impl/codegen/client_unary_call.h index 5151839412b..b9f8e1663f1 100644 --- a/include/grpcpp/impl/codegen/client_unary_call.h +++ b/include/grpcpp/impl/codegen/client_unary_call.h @@ -57,7 +57,7 @@ class BlockingUnaryCallImpl { CallOpRecvInitialMetadata, CallOpRecvMessage, CallOpClientSendClose, CallOpClientRecvStatus> ops; - status_ = ops.SendMessage(request); + status_ = ops.SendMessagePtr(&request); if (!status_.ok()) { return; } diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index 46175cd73b8..5a9a3a44e6e 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -109,7 +109,13 @@ class InterceptorBatchMethods { /// Returns a modifable ByteBuffer holding the serialized form of the message /// that is going to be sent. Valid for PRE_SEND_MESSAGE interceptions. /// A return value of nullptr indicates that this ByteBuffer is not valid. - virtual ByteBuffer* GetSendMessage() = 0; + virtual ByteBuffer* GetSerializedSendMessage() = 0; + + /// Returns a non-modifiable pointer to the original non-serialized form of + /// the message. Valid for PRE_SEND_MESSAGE interceptions. A return value of + /// nullptr indicates that this field is not valid. Also note that this is + /// only supported for sync and callback APIs at the present moment. + virtual const void* GetSendMessage() = 0; /// Returns a modifiable multimap of the initial metadata to be sent. Valid /// for PRE_SEND_INITIAL_METADATA interceptions. A value of nullptr indicates diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h index d0aa23cb0a0..4b7eaefee1b 100644 --- a/include/grpcpp/impl/codegen/interceptor_common.h +++ b/include/grpcpp/impl/codegen/interceptor_common.h @@ -79,7 +79,9 @@ class InterceptorBatchMethodsImpl hooks_[static_cast(type)] = true; } - ByteBuffer* GetSendMessage() override { return send_message_; } + ByteBuffer* GetSerializedSendMessage() override { return send_message_; } + + const void* GetSendMessage() override { return orig_send_message_; } std::multimap* GetSendInitialMetadata() override { return send_initial_metadata_; @@ -115,7 +117,10 @@ class InterceptorBatchMethodsImpl return recv_trailing_metadata_->map(); } - void SetSendMessage(ByteBuffer* buf) { send_message_ = buf; } + void SetSendMessage(ByteBuffer* buf, const void* msg) { + send_message_ = buf; + orig_send_message_ = msg; + } void SetSendInitialMetadata( std::multimap* metadata) { @@ -334,6 +339,7 @@ class InterceptorBatchMethodsImpl std::function callback_; ByteBuffer* send_message_ = nullptr; + const void* orig_send_message_ = nullptr; std::multimap* send_initial_metadata_; @@ -379,13 +385,21 @@ class CancelInterceptorBatchMethods "Cancel notification"); } - ByteBuffer* GetSendMessage() override { + ByteBuffer* GetSerializedSendMessage() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetSendMessage on a method which " "has a Cancel notification"); return nullptr; } + const void* GetSendMessage() override { + GPR_CODEGEN_ASSERT( + false && + "It is illegal to call GetOriginalSendMessage on a method which " + "has a Cancel notification"); + return nullptr; + } + std::multimap* GetSendInitialMetadata() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetSendInitialMetadata on a " diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h index dd53f975f68..094286294c2 100644 --- a/include/grpcpp/impl/codegen/method_handler_impl.h +++ b/include/grpcpp/impl/codegen/method_handler_impl.h @@ -79,7 +79,7 @@ class RpcMethodHandler : public MethodHandler { ops.set_compression_level(param.server_context->compression_level()); } if (status.ok()) { - status = ops.SendMessage(rsp); + status = ops.SendMessagePtr(&rsp); } ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); @@ -139,7 +139,7 @@ class ClientStreamingHandler : public MethodHandler { } } if (status.ok()) { - status = ops.SendMessage(rsp); + status = ops.SendMessagePtr(&rsp); } ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index 1854f6ef2f5..a0e59215dd6 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -320,7 +320,7 @@ class CallbackUnaryHandler : public MethodHandler { // The response is dropped if the status is not OK. if (s.ok()) { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_ops_.SendMessage(resp_)); + finish_ops_.SendMessagePtr(&resp_)); } else { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } @@ -449,7 +449,7 @@ class CallbackClientStreamingHandler : public MethodHandler { // The response is dropped if the status is not OK. if (s.ok()) { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, - finish_ops_.SendMessage(resp_)); + finish_ops_.SendMessagePtr(&resp_)); } else { finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); } @@ -642,7 +642,7 @@ class CallbackServerStreamingHandler : public MethodHandler { ctx_->sent_initial_metadata_ = true; } // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok()); + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); call_.PerformOps(&write_ops_); } @@ -652,7 +652,7 @@ class CallbackServerStreamingHandler : public MethodHandler { // Don't send any message if the status is bad if (s.ok()) { // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok()); + GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); } Finish(std::move(s)); } @@ -804,7 +804,7 @@ class CallbackBidiHandler : public MethodHandler { ctx_->sent_initial_metadata_ = true; } // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(*resp, options).ok()); + GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); call_.PerformOps(&write_ops_); } @@ -813,7 +813,7 @@ class CallbackBidiHandler : public MethodHandler { // Don't send any message if the status is bad if (s.ok()) { // TODO(vjpai): don't assert - GPR_CODEGEN_ASSERT(finish_ops_.SendMessage(*resp, options).ok()); + GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); } Finish(std::move(s)); } diff --git a/include/grpcpp/impl/codegen/sync_stream.h b/include/grpcpp/impl/codegen/sync_stream.h index 6981076f04e..d9edad42153 100644 --- a/include/grpcpp/impl/codegen/sync_stream.h +++ b/include/grpcpp/impl/codegen/sync_stream.h @@ -253,7 +253,7 @@ class ClientReader final : public ClientReaderInterface { ops.SendInitialMetadata(&context->send_initial_metadata_, context->initial_metadata_flags()); // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); + GPR_CODEGEN_ASSERT(ops.SendMessagePtr(&request).ok()); ops.ClientSendClose(); call_.PerformOps(&ops); cq_.Pluck(&ops); @@ -331,7 +331,7 @@ class ClientWriter : public ClientWriterInterface { context_->initial_metadata_flags()); context_->set_initial_metadata_corked(false); } - if (!ops.SendMessage(msg, options).ok()) { + if (!ops.SendMessagePtr(&msg, options).ok()) { return false; } @@ -502,7 +502,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { context_->initial_metadata_flags()); context_->set_initial_metadata_corked(false); } - if (!ops.SendMessage(msg, options).ok()) { + if (!ops.SendMessagePtr(&msg, options).ok()) { return false; } @@ -656,7 +656,7 @@ class ServerWriter final : public ServerWriterInterface { options.set_buffer_hint(); } - if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { + if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { @@ -734,7 +734,7 @@ class ServerReaderWriterBody final { if (options.is_last_message()) { options.set_buffer_hint(); } - if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { + if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc index 8abf4eb3f49..3db709e956e 100644 --- a/test/cpp/end2end/client_interceptors_end2end_test.cc +++ b/test/cpp/end2end/client_interceptors_end2end_test.cc @@ -68,7 +68,7 @@ class HijackingInterceptor : public experimental::Interceptor { if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { EchoRequest req; - auto* buffer = methods->GetSendMessage(); + auto* buffer = methods->GetSerializedSendMessage(); auto copied_buffer = *buffer; EXPECT_TRUE( SerializationTraits::Deserialize(&copied_buffer, &req) @@ -173,7 +173,7 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor { if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { EchoRequest req; - auto* buffer = methods->GetSendMessage(); + auto* buffer = methods->GetSerializedSendMessage(); auto copied_buffer = *buffer; EXPECT_TRUE( SerializationTraits::Deserialize(&copied_buffer, &req) @@ -287,12 +287,16 @@ class LoggingInterceptor : public experimental::Interceptor { if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { EchoRequest req; - auto* buffer = methods->GetSendMessage(); + auto* buffer = methods->GetSerializedSendMessage(); auto copied_buffer = *buffer; EXPECT_TRUE( SerializationTraits::Deserialize(&copied_buffer, &req) .ok()); - EXPECT_TRUE(req.message().find("Hello") == 0); + EXPECT_TRUE(req.message().find("Hello") == 0u); + EXPECT_EQ(static_cast(methods->GetSendMessage()) + ->message() + .find("Hello"), + 0u); } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) { @@ -308,7 +312,7 @@ class LoggingInterceptor : public experimental::Interceptor { experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) { EchoResponse* resp = static_cast(methods->GetRecvMessage()); - EXPECT_TRUE(resp->message().find("Hello") == 0); + EXPECT_TRUE(resp->message().find("Hello") == 0u); } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_STATUS)) { diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc index 53d8c4dc960..09e855b0d01 100644 --- a/test/cpp/end2end/server_interceptors_end2end_test.cc +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -73,7 +73,7 @@ class LoggingInterceptor : public experimental::Interceptor { type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)); } - virtual void Intercept(experimental::InterceptorBatchMethods* methods) { + void Intercept(experimental::InterceptorBatchMethods* methods) override { if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { auto* map = methods->GetSendInitialMetadata(); @@ -83,7 +83,7 @@ class LoggingInterceptor : public experimental::Interceptor { if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { EchoRequest req; - auto* buffer = methods->GetSendMessage(); + auto* buffer = methods->GetSerializedSendMessage(); auto copied_buffer = *buffer; EXPECT_TRUE( SerializationTraits::Deserialize(&copied_buffer, &req) @@ -142,6 +142,32 @@ class LoggingInterceptorFactory } }; +// Test if GetSendMessage works as expected +class GetSendMessageTester : public experimental::Interceptor { + public: + GetSendMessageTester(experimental::ServerRpcInfo* info) {} + + void Intercept(experimental::InterceptorBatchMethods* methods) override { + if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { + EXPECT_EQ(static_cast(methods->GetSendMessage()) + ->message() + .find("Hello"), + 0u); + } + methods->Proceed(); + } +}; + +class GetSendMessageTesterFactory + : public experimental::ServerInterceptorFactoryInterface { + public: + virtual experimental::Interceptor* CreateServerInterceptor( + experimental::ServerRpcInfo* info) override { + return new GetSendMessageTester(info); + } +}; + void MakeBidiStreamingCall(const std::shared_ptr& channel) { auto stub = grpc::testing::EchoTestService::NewStub(channel); ClientContext ctx; @@ -176,6 +202,9 @@ class ServerInterceptorsEnd2endSyncUnaryTest : public ::testing::Test { creators.push_back( std::unique_ptr( new LoggingInterceptorFactory())); + creators.push_back( + std::unique_ptr( + new GetSendMessageTesterFactory())); // Add 20 dummy interceptor factories and null interceptor factories for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr( @@ -216,6 +245,9 @@ class ServerInterceptorsEnd2endSyncStreamingTest : public ::testing::Test { creators.push_back( std::unique_ptr( new LoggingInterceptorFactory())); + creators.push_back( + std::unique_ptr( + new GetSendMessageTesterFactory())); for (auto i = 0; i < 20; i++) { creators.push_back(std::unique_ptr( new DummyInterceptorFactory()));