diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h index 51cc18e2b17..1fc8481bc6e 100644 --- a/include/grpcpp/impl/codegen/call.h +++ b/include/grpcpp/impl/codegen/call.h @@ -1005,8 +1005,8 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { } else { if (rpc_info->hijacked_) { curr_iteration_ = rpc_info->hijacked_interceptor_; - gpr_log(GPR_ERROR, "running from the hijacked %d", - rpc_info->hijacked_interceptor_); + // gpr_log(GPR_ERROR, "running from the hijacked %d", + // rpc_info->hijacked_interceptor_); } else { curr_iteration_ = rpc_info->interceptors_.size() - 1; } @@ -1165,6 +1165,7 @@ class CallOpSet : public CallOpSetInterface, } void FillOps(Call* call) override { + // gpr_log(GPR_ERROR, "filling ops %p", this); done_intercepting_ = false; g_core_codegen_interface->grpc_call_ref(call->call()); call_ = @@ -1179,10 +1180,12 @@ class CallOpSet : public CallOpSetInterface, } bool FinalizeResult(void** tag, bool* status) override { + // gpr_log(GPR_ERROR, "finalizing result %p", this); if (done_intercepting_) { // We have already finished intercepting and filling in the results. This // round trip from the core needed to be made because interceptors were // run + // gpr_log(GPR_ERROR, "done intercepting"); *tag = return_tag_; g_core_codegen_interface->grpc_call_unref(call_.call()); return true; @@ -1194,13 +1197,15 @@ class CallOpSet : public CallOpSetInterface, this->Op4::FinishOp(status); this->Op5::FinishOp(status); this->Op6::FinishOp(status); + // gpr_log(GPR_ERROR, "done finish ops"); if (RunInterceptorsPostRecv()) { *tag = return_tag_; g_core_codegen_interface->grpc_call_unref(call_.call()); + // gpr_log(GPR_ERROR, "no interceptors"); return true; } - + // gpr_log(GPR_ERROR, "running interceptors"); // Interceptors are going to be run, so we can't return the tag just yet. // After the interceptors are run, ContinueFinalizeResultAfterInterception return false; @@ -1238,6 +1243,7 @@ class CallOpSet : public CallOpSetInterface, this->Op4::AddOp(ops, &nops); this->Op5::AddOp(ops, &nops); this->Op6::AddOp(ops, &nops); + // gpr_log(GPR_ERROR, "going to start call batch %p", this); GPR_CODEGEN_ASSERT(GRPC_CALL_OK == g_core_codegen_interface->grpc_call_start_batch( call_.call(), ops, nops, cq_tag(), nullptr)); diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index ca2f867d04d..3f586b2394d 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -94,7 +94,10 @@ class CallbackWithStatusTag void Run(bool ok) { void* ignored = ops_; - GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &ok)); + if (!ops_->FinalizeResult(&ignored, &ok)) { + // The tag was swallowed + return; + } GPR_CODEGEN_ASSERT(ignored == ops_); // Last use of func_ or status_, so ok to move them out diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc index 4a3a8b859a8..81a0d493023 100644 --- a/test/cpp/end2end/client_interceptors_end2end_test.cc +++ b/test/cpp/end2end/client_interceptors_end2end_test.cc @@ -70,19 +70,31 @@ class DummyInterceptor : public experimental::Interceptor { if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { num_times_run_++; + } else if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA)) { + num_times_run_reverse_++; } methods->Proceed(); } - static void Reset() { num_times_run_.store(0); } + static void Reset() { + num_times_run_.store(0); + num_times_run_reverse_.store(0); + } - static int GetNumTimesRun() { return num_times_run_.load(); } + static int GetNumTimesRun() { + EXPECT_EQ(num_times_run_.load(), num_times_run_reverse_.load()); + return num_times_run_.load(); + } private: static std::atomic num_times_run_; + static std::atomic num_times_run_reverse_; }; std::atomic DummyInterceptor::num_times_run_; +std::atomic DummyInterceptor::num_times_run_reverse_; class DummyInterceptorFactory : public experimental::ClientInterceptorFactoryInterface { @@ -208,7 +220,6 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor { virtual void Intercept(experimental::InterceptorBatchMethods* methods) { gpr_log(GPR_ERROR, "ran this"); - bool hijack = false; if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { auto* map = methods->GetSendInitialMetadata(); @@ -217,7 +228,6 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor { auto iterator = map->begin(); EXPECT_EQ("testkey", iterator->first); EXPECT_EQ("testvalue", iterator->second); - hijack = true; // Make a copy of the map metadata_map_ = *map; } @@ -228,15 +238,20 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor { auto copied_buffer = *buffer; SerializationTraits::Deserialize(&copied_buffer, &req); EXPECT_EQ(req.message(), "Hello"); + req_ = req; auto stub = grpc::testing::EchoTestService::NewStub( methods->GetInterceptedChannel()); - ClientContext ctx; - EchoResponse resp; - ctx.AddMetadata(metadata_map_.begin()->first, - metadata_map_.begin()->second); - Status s = stub->Echo(&ctx, req, &resp); - EXPECT_EQ(s.ok(), true); - EXPECT_EQ(resp.message(), "Hello"); + ctx_.AddMetadata(metadata_map_.begin()->first, + metadata_map_.begin()->second); + stub->experimental_async()->Echo(&ctx_, &req_, &resp_, + [this, &methods](Status s) { + EXPECT_EQ(s.ok(), true); + EXPECT_EQ(resp_.message(), "Hello"); + methods->Hijack(); + }); + // There isn't going to be any other interesting operation in this batch, + // so it is fine to return + return; } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) { @@ -254,8 +269,7 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor { static_cast(methods->GetRecvMessage()); // Check that we got the hijacked message, and re-insert the expected // message - EXPECT_EQ(resp->message(), "Hello1"); - resp->set_message("Hello"); + EXPECT_EQ(resp->message(), "Hello"); } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_STATUS)) { @@ -282,28 +296,27 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor { // Insert a different message than expected EchoResponse* resp = static_cast(methods->GetRecvMessage()); - resp->set_message("Hello1"); + resp->set_message(resp_.message()); } if (methods->QueryInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_RECV_STATUS)) { auto* map = methods->GetRecvTrailingMetadata(); // insert the metadata that we want EXPECT_EQ(map->size(), 0); - map->insert(std::make_pair("testkey", "testvalue")); + *map = ctx_.GetServerTrailingMetadata(); auto* status = methods->GetRecvStatus(); *status = Status(StatusCode::OK, ""); } - if (hijack) { - gpr_log(GPR_ERROR, "hijacking"); - methods->Hijack(); - } else { - methods->Proceed(); - } + + methods->Proceed(); } private: experimental::ClientRpcInfo* info_; std::multimap metadata_map_; + ClientContext ctx_; + EchoRequest req_; + EchoResponse resp_; }; class HijackingInterceptorMakesAnotherCallFactory @@ -401,6 +414,32 @@ void MakeCall(std::shared_ptr channel) { EXPECT_EQ(resp.message(), "Hello"); } +void MakeCallbackCall(std::shared_ptr channel) { + auto stub = grpc::testing::EchoTestService::NewStub(channel); + ClientContext ctx; + EchoRequest req; + std::mutex mu; + std::condition_variable cv; + bool done = false; + req.mutable_param()->set_echo_metadata(true); + ctx.AddMetadata("testkey", "testvalue"); + req.set_message("Hello"); + EchoResponse resp; + stub->experimental_async()->Echo(&ctx, &req, &resp, + [&resp, &mu, &done, &cv](Status s) { + gpr_log(GPR_ERROR, "got the callback"); + EXPECT_EQ(s.ok(), true); + EXPECT_EQ(resp.message(), "Hello"); + std::lock_guard l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock l(mu); + while (!done) { + cv.wait(l); + } +} + TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorLoggingTest) { ChannelArguments args; DummyInterceptor::Reset(); @@ -444,16 +483,7 @@ TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorHijackingTest) { auto channel = experimental::CreateCustomChannelWithInterceptors( server_address_, InsecureChannelCredentials(), args, std::move(creators)); - auto stub = grpc::testing::EchoTestService::NewStub(channel); - ClientContext ctx; - EchoRequest req; - req.mutable_param()->set_echo_metadata(true); - ctx.AddMetadata("testkey", "testvalue"); - req.set_message("Hello"); - EchoResponse resp; - Status s = stub->Echo(&ctx, req, &resp); - EXPECT_EQ(s.ok(), true); - EXPECT_EQ(resp.message(), "Hello"); + MakeCall(channel); // Make sure only 20 dummy interceptors were run EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); } @@ -471,16 +501,7 @@ TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorLogThenHijackTest) { auto channel = experimental::CreateCustomChannelWithInterceptors( server_address_, InsecureChannelCredentials(), args, std::move(creators)); - auto stub = grpc::testing::EchoTestService::NewStub(channel); - ClientContext ctx; - EchoRequest req; - req.mutable_param()->set_echo_metadata(true); - ctx.AddMetadata("testkey", "testvalue"); - req.set_message("Hello"); - EchoResponse resp; - Status s = stub->Echo(&ctx, req, &resp); - EXPECT_EQ(s.ok(), true); - EXPECT_EQ(resp.message(), "Hello"); + MakeCall(channel); } TEST_F(ClientInterceptorsEnd2endTest, @@ -491,35 +512,48 @@ TEST_F(ClientInterceptorsEnd2endTest, std::unique_ptr>>( new std::vector< std::unique_ptr>()); - // Add 20 dummy interceptors before hijacking interceptor - for (auto i = 0; i < 20; i++) { + // Add 5 dummy interceptors before hijacking interceptor + for (auto i = 0; i < 5; i++) { creators->push_back(std::unique_ptr( new DummyInterceptorFactory())); } creators->push_back( std::unique_ptr( new HijackingInterceptorMakesAnotherCallFactory())); - // Add 20 dummy interceptors after hijacking interceptor - for (auto i = 0; i < 20; i++) { + // Add 7 dummy interceptors after hijacking interceptor + for (auto i = 0; i < 7; i++) { creators->push_back(std::unique_ptr( new DummyInterceptorFactory())); } auto channel = experimental::CreateCustomChannelWithInterceptors( server_address_, InsecureChannelCredentials(), args, std::move(creators)); - auto stub = grpc::testing::EchoTestService::NewStub(channel); - ClientContext ctx; - EchoRequest req; - req.mutable_param()->set_echo_metadata(true); - ctx.AddMetadata("testkey", "testvalue"); - req.set_message("Hello"); - EchoResponse resp; - Status s = stub->Echo(&ctx, req, &resp); - EXPECT_EQ(s.ok(), true); - EXPECT_EQ(resp.message(), "Hello"); + MakeCall(channel); // Make sure all interceptors were run once, since the hijacking interceptor // makes an RPC on the intercepted channel - EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 40); + EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 12); +} + +TEST_F(ClientInterceptorsEnd2endTest, + ClientInterceptorLoggingTestWithCallback) { + ChannelArguments args; + DummyInterceptor::Reset(); + auto creators = std::unique_ptr>>( + new std::vector< + std::unique_ptr>()); + creators->push_back(std::unique_ptr( + new LoggingInterceptorFactory())); + // Add 20 dummy interceptors + for (auto i = 0; i < 20; i++) { + creators->push_back(std::unique_ptr( + new DummyInterceptorFactory())); + } + auto channel = experimental::CreateCustomChannelWithInterceptors( + server_address_, InsecureChannelCredentials(), args, std::move(creators)); + MakeCallbackCall(channel); + // Make sure all 20 dummy interceptors were run + EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); } } // namespace