diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index c0de5ed6025..4ca87a99fca 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -877,6 +878,8 @@ class CallOpSet : public CallOpSetInterface, bool FinalizeResult(void** tag, bool* status) override { if (done_intercepting_) { + // Complete the avalanching since we are done with this batch of ops + call_.cq()->CompleteAvalanching(); // We have already finished intercepting and filling in the results. This // round trip from the core needed to be made because interceptors were // run @@ -961,6 +964,12 @@ class CallOpSet : public CallOpSetInterface, this->Op4::SetInterceptionHookPoint(&interceptor_methods_); this->Op5::SetInterceptionHookPoint(&interceptor_methods_); this->Op6::SetInterceptionHookPoint(&interceptor_methods_); + if (interceptor_methods_.InterceptorsListEmpty()) { + return true; + } + // This call will go through interceptors and would need to + // schedule new batches, so delay completion queue shutdown + call_.cq()->RegisterAvalanching(); return interceptor_methods_.RunInterceptors(); } // Returns true if no interceptors need to be run diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index fb38788f7d6..4812f0253d4 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -84,6 +84,8 @@ template class ErrorMethodHandler; template class BlockingUnaryCallImpl; +template +class CallOpSet; } // namespace internal extern CoreCodegenInterface* g_core_codegen_interface; @@ -278,6 +280,10 @@ class CompletionQueue : private GrpcLibraryCodegen { // Friends that need access to constructor for callback CQ friend class ::grpc::Channel; + // For access to Register/CompleteAvalanching + template + friend class ::grpc::internal::CallOpSet; + /// EXPERIMENTAL /// Creates a Thread Local cache to store the first event /// On this completion queue queued from this thread. Once @@ -361,7 +367,12 @@ class CompletionQueue : private GrpcLibraryCodegen { gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, static_cast(1)); } - void CompleteAvalanching(); + void CompleteAvalanching() { + if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, + static_cast(-1)) == 1) { + g_core_codegen_interface->grpc_completion_queue_shutdown(cq_); + } + } grpc_completion_queue* cq_; // owned diff --git a/include/grpcpp/impl/codegen/core_codegen.h b/include/grpcpp/impl/codegen/core_codegen.h index 6ef184d01ab..b7ddb0c791c 100644 --- a/include/grpcpp/impl/codegen/core_codegen.h +++ b/include/grpcpp/impl/codegen/core_codegen.h @@ -42,6 +42,7 @@ class CoreCodegen final : public CoreCodegenInterface { void* reserved) override; grpc_completion_queue* grpc_completion_queue_create_for_pluck( void* reserved) override; + void grpc_completion_queue_shutdown(grpc_completion_queue* cq) override; void grpc_completion_queue_destroy(grpc_completion_queue* cq) override; grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, diff --git a/include/grpcpp/impl/codegen/core_codegen_interface.h b/include/grpcpp/impl/codegen/core_codegen_interface.h index 20a5b3300c4..1d92b4f0dff 100644 --- a/include/grpcpp/impl/codegen/core_codegen_interface.h +++ b/include/grpcpp/impl/codegen/core_codegen_interface.h @@ -52,6 +52,7 @@ class CoreCodegenInterface { void* reserved) = 0; virtual grpc_completion_queue* grpc_completion_queue_create_for_pluck( void* reserved) = 0; + virtual void grpc_completion_queue_shutdown(grpc_completion_queue* cq) = 0; virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0; virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h index 09721343ffa..8ed84230911 100644 --- a/include/grpcpp/impl/codegen/interceptor_common.h +++ b/include/grpcpp/impl/codegen/interceptor_common.h @@ -219,10 +219,29 @@ class InterceptorBatchMethodsImpl // Alternatively, RunInterceptors(std::function f) can be used. void SetCallOpSetInterface(CallOpSetInterface* ops) { ops_ = ops; } - // Returns true if no interceptors are run. This should be used only by - // subclasses of CallOpSetInterface. SetCall and SetCallOpSetInterface should - // have been called before this. After all the interceptors are done running, - // either ContinueFillOpsAfterInterception or + // SetCall should have been called before this. + // Returns true if the interceptors list is empty + bool InterceptorsListEmpty() { + auto* client_rpc_info = call_->client_rpc_info(); + if (client_rpc_info != nullptr) { + if (client_rpc_info->interceptors_.size() == 0) { + return true; + } else { + return false; + } + } + + auto* server_rpc_info = call_->server_rpc_info(); + if (server_rpc_info == nullptr || + server_rpc_info->interceptors_.size() == 0) { + return true; + } + return false; + } + + // This should be used only by subclasses of CallOpSetInterface. SetCall and + // SetCallOpSetInterface should have been called before this. After all the + // interceptors are done running, either ContinueFillOpsAfterInterception or // ContinueFinalizeOpsAfterInterception will be called. Note that neither of // them is invoked if there were no interceptors registered. bool RunInterceptors() { diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index d93a54aed71..4bb3bcbd8b6 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -42,14 +42,6 @@ void CompletionQueue::Shutdown() { CompleteAvalanching(); } -void CompletionQueue::CompleteAvalanching() { - // Check if this was the last avalanching operation - if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, - static_cast(-1)) == 1) { - grpc_completion_queue_shutdown(cq_); - } -} - CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( void** tag, bool* ok, gpr_timespec deadline) { for (;;) { diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index cfaa2e7b193..9430dcc9881 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -59,6 +59,10 @@ grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_pluck( return ::grpc_completion_queue_create_for_pluck(reserved); } +void CoreCodegen::grpc_completion_queue_shutdown(grpc_completion_queue* cq) { + ::grpc_completion_queue_shutdown(cq); +} + void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) { ::grpc_completion_queue_destroy(cq); } diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc index 82f142ba913..028191c93c3 100644 --- a/test/cpp/end2end/server_interceptors_end2end_test.cc +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -504,7 +504,8 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) { new DummyInterceptorFactory())); } builder.experimental().SetInterceptorCreators(std::move(creators)); - auto cq = builder.AddCompletionQueue(); + auto srv_cq = builder.AddCompletionQueue(); + CompletionQueue cli_cq; auto server = builder.BuildAndStart(); ChannelArguments args; @@ -527,28 +528,28 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) { cli_ctx.AddMetadata("testkey", "testvalue"); std::unique_ptr call = - generic_stub.PrepareCall(&cli_ctx, kMethodName, cq.get()); + generic_stub.PrepareCall(&cli_ctx, kMethodName, &cli_cq); call->StartCall(tag(1)); - Verifier().Expect(1, true).Verify(cq.get()); + Verifier().Expect(1, true).Verify(&cli_cq); std::unique_ptr send_buffer = SerializeToByteBuffer(&send_request); call->Write(*send_buffer, tag(2)); // Send ByteBuffer can be destroyed after calling Write. send_buffer.reset(); - Verifier().Expect(2, true).Verify(cq.get()); + Verifier().Expect(2, true).Verify(&cli_cq); call->WritesDone(tag(3)); - Verifier().Expect(3, true).Verify(cq.get()); + Verifier().Expect(3, true).Verify(&cli_cq); - service.RequestCall(&srv_ctx, &stream, cq.get(), cq.get(), tag(4)); + service.RequestCall(&srv_ctx, &stream, srv_cq.get(), srv_cq.get(), tag(4)); - Verifier().Expect(4, true).Verify(cq.get()); + Verifier().Expect(4, true).Verify(srv_cq.get()); EXPECT_EQ(kMethodName, srv_ctx.method()); EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue")); srv_ctx.AddTrailingMetadata("testkey", "testvalue"); ByteBuffer recv_buffer; stream.Read(&recv_buffer, tag(5)); - Verifier().Expect(5, true).Verify(cq.get()); + Verifier().Expect(5, true).Verify(srv_cq.get()); EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request)); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -556,18 +557,23 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) { send_buffer = SerializeToByteBuffer(&send_response); stream.Write(*send_buffer, tag(6)); send_buffer.reset(); - Verifier().Expect(6, true).Verify(cq.get()); + Verifier().Expect(6, true).Verify(srv_cq.get()); stream.Finish(Status::OK, tag(7)); - Verifier().Expect(7, true).Verify(cq.get()); + // Shutdown srv_cq before we try to get the tag back, to verify that the + // interception API handles completion queue shutdowns that take place before + // all the tags are returned + srv_cq->Shutdown(); + Verifier().Expect(7, true).Verify(srv_cq.get()); recv_buffer.Clear(); call->Read(&recv_buffer, tag(8)); - Verifier().Expect(8, true).Verify(cq.get()); + Verifier().Expect(8, true).Verify(&cli_cq); EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response)); call->Finish(&recv_status, tag(9)); - Verifier().Expect(9, true).Verify(cq.get()); + cli_cq.Shutdown(); + Verifier().Expect(9, true).Verify(&cli_cq); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -578,10 +584,11 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) { EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); server->Shutdown(); - cq->Shutdown(); void* ignored_tag; bool ignored_ok; - while (cq->Next(&ignored_tag, &ignored_ok)) + while (cli_cq.Next(&ignored_tag, &ignored_ok)) + ; + while (srv_cq->Next(&ignored_tag, &ignored_ok)) ; grpc_recycle_unused_port(port); }