From 1cdc78cb9e661a18f49e5265a1ceb469f7223d59 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 28 Jul 2020 09:54:17 -0700 Subject: [PATCH 1/4] Revert "Merge pull request #23562 from yang-g/one_core" This reverts commit f614b66a9833b1d223dbe12d0f8105ea5432cd23, reversing changes made to 5d803dcd3f24cbac609fa2390ff897b3243f14d2. --- src/core/lib/surface/completion_queue.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index bf99128e5b5..c59e329fabe 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -364,7 +364,7 @@ struct cq_callback_alternative_data { if (shared_cq_next == nullptr) { shared_cq_next = grpc_completion_queue_create_for_next(nullptr); - int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores(), 1, 32); + int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores(), 2, 32); threads_remaining.Store(num_nexting_threads, grpc_core::MemoryOrder::RELEASE); for (int i = 0; i < num_nexting_threads; i++) { From afa2bd836788e3067791a804479178adae13f2a2 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 28 Jul 2020 09:54:36 -0700 Subject: [PATCH 2/4] Revert "Merge pull request #23435 from vjpai/fix_iomgr_non_polling" This reverts commit 2d73a17587193380686ae36fc170962f286591f9, reversing changes made to 08d737abf8d8dac95a3c28e8deb2d744e2ce101d. --- src/core/lib/iomgr/iomgr.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 9c97823dd6f..d7da7c9151d 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -31,7 +31,6 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/gprpp/global_config.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/buffer_list.h" @@ -51,7 +50,7 @@ static gpr_cv g_rcv; static int g_shutdown; static grpc_iomgr_object g_root_object; static bool g_grpc_abort_on_leaks; -static grpc_core::Atomic g_iomgr_non_polling{false}; +static bool g_iomgr_non_polling; void grpc_iomgr_init() { grpc_core::ExecCtx exec_ctx; @@ -196,9 +195,14 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object* obj) { bool grpc_iomgr_abort_on_leaks(void) { return g_grpc_abort_on_leaks; } bool grpc_iomgr_non_polling() { - return g_iomgr_non_polling.Load(grpc_core::MemoryOrder::SEQ_CST); + gpr_mu_lock(&g_mu); + bool ret = g_iomgr_non_polling; + gpr_mu_unlock(&g_mu); + return ret; } void grpc_iomgr_mark_non_polling_internal() { - g_iomgr_non_polling.Store(true, grpc_core::MemoryOrder::SEQ_CST); + gpr_mu_lock(&g_mu); + g_iomgr_non_polling = true; + gpr_mu_unlock(&g_mu); } From bcb65307179000b6ce64428dd6db8a58dffc5d86 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 28 Jul 2020 09:55:17 -0700 Subject: [PATCH 3/4] Revert "Merge pull request #23430 from vjpai/remove_do_not_test" This reverts commit 08d737abf8d8dac95a3c28e8deb2d744e2ce101d, reversing changes made to 6dc44b33c79a39dd072dc385617fc9470b161578. --- .../end2end/client_callback_end2end_test.cc | 62 ++++++++++++- test/cpp/end2end/end2end_test.cc | 86 ++++++++++++++++++- .../end2end/message_allocator_end2end_test.cc | 33 ++++++- 3 files changed, 177 insertions(+), 4 deletions(-) diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 8a2d469ddf6..63e726d0cde 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -45,6 +45,17 @@ #include "test/cpp/util/string_ref_helper.h" #include "test/cpp/util/test_credentials_provider.h" +// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration +// should be skipped based on a decision made at SetUp time. In particular, any +// callback tests can only be run if the iomgr can run in the background or if +// the transport is in-process. +#define MAYBE_SKIP_TEST \ + do { \ + if (do_not_test_) { \ + return; \ + } \ + } while (0) + namespace grpc { namespace testing { namespace { @@ -119,6 +130,10 @@ class ClientCallbackEnd2endTest server_ = builder.BuildAndStart(); is_server_started_ = true; + // if (GetParam().protocol == Protocol::TCP && + // !grpc_iomgr_run_in_background()) { + // do_not_test_ = true; + // } } void ResetStub() { @@ -352,6 +367,7 @@ class ClientCallbackEnd2endTest rpc.Await(); } } + bool do_not_test_{false}; bool is_server_started_{false}; int picked_port_{0}; std::shared_ptr channel_; @@ -364,11 +380,13 @@ class ClientCallbackEnd2endTest }; TEST_P(ClientCallbackEnd2endTest, SimpleRpc) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcs(1, false); } TEST_P(ClientCallbackEnd2endTest, SimpleRpcExpectedError) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; @@ -403,6 +421,7 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpcExpectedError) { } TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLockNested) { + MAYBE_SKIP_TEST; ResetStub(); std::mutex mu1, mu2, mu3; std::condition_variable cv; @@ -453,6 +472,7 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLockNested) { } TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) { + MAYBE_SKIP_TEST; ResetStub(); std::mutex mu; std::condition_variable cv; @@ -480,16 +500,19 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) { } TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcs(10, false); } TEST_P(ClientCallbackEnd2endTest, SequentialRpcsRawReq) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcsRawReq(10); } TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) { + MAYBE_SKIP_TEST; ResetStub(); SimpleRequest request; SimpleResponse response; @@ -516,43 +539,51 @@ TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) { } TEST_P(ClientCallbackEnd2endTest, SimpleRpcWithBinaryMetadata) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcs(1, true); } TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcs(10, true); } TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcsGeneric(10, false); } TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) { + MAYBE_SKIP_TEST; ResetStub(); SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true); } TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) { + MAYBE_SKIP_TEST; ResetStub(); SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true); } TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) { + MAYBE_SKIP_TEST; ResetStub(); SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false); } #if GRPC_ALLOW_EXCEPTIONS TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcsGeneric(10, true); } #endif TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { + MAYBE_SKIP_TEST; ResetStub(); std::vector threads; threads.reserve(10); @@ -565,6 +596,7 @@ TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { } TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) { + MAYBE_SKIP_TEST; ResetStub(); std::vector threads; threads.reserve(10); @@ -577,6 +609,7 @@ TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) { } TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -605,6 +638,7 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { } TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -735,6 +769,7 @@ class WriteClient : public grpc::experimental::ClientWriteReactor { }; TEST_P(ClientCallbackEnd2endTest, RequestStream) { + MAYBE_SKIP_TEST; ResetStub(); WriteClient test{stub_.get(), DO_NOT_CANCEL, 3}; test.Await(); @@ -745,6 +780,7 @@ TEST_P(ClientCallbackEnd2endTest, RequestStream) { } TEST_P(ClientCallbackEnd2endTest, ClientCancelsRequestStream) { + MAYBE_SKIP_TEST; ResetStub(); WriteClient test{stub_.get(), DO_NOT_CANCEL, 3, ClientCancelInfo{2}}; test.Await(); @@ -756,6 +792,7 @@ TEST_P(ClientCallbackEnd2endTest, ClientCancelsRequestStream) { // Server to cancel before doing reading the request TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) { + MAYBE_SKIP_TEST; ResetStub(); WriteClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 1}; test.Await(); @@ -767,6 +804,7 @@ TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) { // Server to cancel while reading a request from the stream in parallel TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) { + MAYBE_SKIP_TEST; ResetStub(); WriteClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10}; test.Await(); @@ -779,6 +817,7 @@ TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) { // Server to cancel after reading all the requests but before returning to the // client TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) { + MAYBE_SKIP_TEST; ResetStub(); WriteClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 4}; test.Await(); @@ -789,6 +828,7 @@ TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) { } TEST_P(ClientCallbackEnd2endTest, UnaryReactor) { + MAYBE_SKIP_TEST; ResetStub(); class UnaryClient : public grpc::experimental::ClientUnaryReactor { public: @@ -847,6 +887,7 @@ TEST_P(ClientCallbackEnd2endTest, UnaryReactor) { } TEST_P(ClientCallbackEnd2endTest, GenericUnaryReactor) { + MAYBE_SKIP_TEST; ResetStub(); const std::string kMethodName("/grpc.testing.EchoTestService/Echo"); class UnaryClient : public grpc::experimental::ClientUnaryReactor { @@ -1012,6 +1053,7 @@ class ReadClient : public grpc::experimental::ClientReadReactor { }; TEST_P(ClientCallbackEnd2endTest, ResponseStream) { + MAYBE_SKIP_TEST; ResetStub(); ReadClient test{stub_.get(), DO_NOT_CANCEL}; test.Await(); @@ -1022,6 +1064,7 @@ TEST_P(ClientCallbackEnd2endTest, ResponseStream) { } TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) { + MAYBE_SKIP_TEST; ResetStub(); ReadClient test{stub_.get(), DO_NOT_CANCEL, ClientCancelInfo{2}}; test.Await(); @@ -1031,6 +1074,7 @@ TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) { // Server to cancel before sending any response messages TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) { + MAYBE_SKIP_TEST; ResetStub(); ReadClient test{stub_.get(), CANCEL_BEFORE_PROCESSING}; test.Await(); @@ -1042,6 +1086,7 @@ TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) { // Server to cancel while writing a response to the stream in parallel TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) { + MAYBE_SKIP_TEST; ResetStub(); ReadClient test{stub_.get(), CANCEL_DURING_PROCESSING}; test.Await(); @@ -1054,6 +1099,7 @@ TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) { // Server to cancel after writing all the respones to the stream but before // returning to the client TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) { + MAYBE_SKIP_TEST; ResetStub(); ReadClient test{stub_.get(), CANCEL_AFTER_PROCESSING}; test.Await(); @@ -1218,6 +1264,7 @@ class BidiClient }; TEST_P(ClientCallbackEnd2endTest, BidiStream) { + MAYBE_SKIP_TEST; ResetStub(); BidiClient test(stub_.get(), DO_NOT_CANCEL, kServerDefaultResponseStreamsToSend, @@ -1230,6 +1277,7 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) { } TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) { + MAYBE_SKIP_TEST; ResetStub(); BidiClient test(stub_.get(), DO_NOT_CANCEL, kServerDefaultResponseStreamsToSend, @@ -1242,6 +1290,7 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) { } TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) { + MAYBE_SKIP_TEST; ResetStub(); BidiClient test(stub_.get(), DO_NOT_CANCEL, kServerDefaultResponseStreamsToSend, @@ -1254,6 +1303,7 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) { } TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) { + MAYBE_SKIP_TEST; ResetStub(); BidiClient test(stub_.get(), DO_NOT_CANCEL, kServerDefaultResponseStreamsToSend, @@ -1266,6 +1316,7 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) { } TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) { + MAYBE_SKIP_TEST; ResetStub(); BidiClient test(stub_.get(), DO_NOT_CANCEL, kServerDefaultResponseStreamsToSend, @@ -1280,6 +1331,7 @@ TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) { // Server to cancel before reading/writing any requests/responses on the stream TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) { + MAYBE_SKIP_TEST; ResetStub(); BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2, /*cork_metadata=*/false, /*first_write_async=*/false); @@ -1293,6 +1345,7 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) { // Server to cancel while reading/writing requests/responses on the stream in // parallel TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) { + MAYBE_SKIP_TEST; ResetStub(); BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING, /*num_msgs_to_send=*/10, /*cork_metadata=*/false, @@ -1307,6 +1360,7 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) { // Server to cancel after reading/writing all requests/responses on the stream // but before returning to the client TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) { + MAYBE_SKIP_TEST; ResetStub(); BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5, /*cork_metadata=*/false, /*first_write_async=*/false); @@ -1318,6 +1372,7 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) { } TEST_P(ClientCallbackEnd2endTest, SimultaneousReadAndWritesDone) { + MAYBE_SKIP_TEST; ResetStub(); class Client : public grpc::experimental::ClientBidiReactor { @@ -1365,6 +1420,7 @@ TEST_P(ClientCallbackEnd2endTest, SimultaneousReadAndWritesDone) { } TEST_P(ClientCallbackEnd2endTest, UnimplementedRpc) { + MAYBE_SKIP_TEST; ChannelArguments args; const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials( GetParam().credentials_type, &args); @@ -1399,6 +1455,7 @@ TEST_P(ClientCallbackEnd2endTest, UnimplementedRpc) { TEST_P(ClientCallbackEnd2endTest, ResponseStreamExtraReactionFlowReadsUntilDone) { + MAYBE_SKIP_TEST; ResetStub(); class ReadAllIncomingDataClient : public grpc::experimental::ClientReadReactor { @@ -1527,5 +1584,8 @@ INSTANTIATE_TEST_SUITE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(argc, argv); - return RUN_ALL_TESTS(); + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 82fbb0f2a6c..e6538344b03 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -62,6 +62,17 @@ using grpc::testing::EchoResponse; using grpc::testing::kTlsCredentialsType; using std::chrono::system_clock; +// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration +// should be skipped based on a decision made at SetUp time. In particular, +// tests that use the callback server can only be run if the iomgr can run in +// the background or if the transport is in-process. +#define MAYBE_SKIP_TEST \ + do { \ + if (do_not_test_) { \ + return; \ + } \ + } while (0) + namespace grpc { namespace testing { namespace { @@ -316,6 +327,14 @@ class End2endTest : public ::testing::TestWithParam { GetParam().Log(); } + void SetUp() override { + // if (GetParam().callback_server && !GetParam().inproc && + // !grpc_iomgr_run_in_background()) { + // do_not_test_ = true; + // return; + // } + } + void TearDown() override { if (is_server_started_) { server_->Shutdown(); @@ -450,6 +469,7 @@ class End2endTest : public ::testing::TestWithParam { DummyInterceptor::Reset(); } + bool do_not_test_{false}; bool is_server_started_; std::shared_ptr channel_; std::unique_ptr stub_; @@ -505,6 +525,7 @@ class End2endServerTryCancelTest : public End2endTest { // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestRequestStreamServerCancel( ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) { + MAYBE_SKIP_TEST; RestartServer(std::shared_ptr()); ResetStub(); EchoRequest request; @@ -583,6 +604,7 @@ class End2endServerTryCancelTest : public End2endTest { // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestResponseStreamServerCancel( ServerTryCancelRequestPhase server_try_cancel) { + MAYBE_SKIP_TEST; RestartServer(std::shared_ptr()); ResetStub(); EchoRequest request; @@ -664,6 +686,7 @@ class End2endServerTryCancelTest : public End2endTest { // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel, int num_messages) { + MAYBE_SKIP_TEST; RestartServer(std::shared_ptr()); ResetStub(); EchoRequest request; @@ -739,6 +762,7 @@ class End2endServerTryCancelTest : public End2endTest { }; TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -801,6 +825,7 @@ TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) { } TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) { + MAYBE_SKIP_TEST; // User-Agent is an HTTP header for HTTP transports only if (GetParam().inproc) { return; @@ -824,6 +849,7 @@ TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) { } TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { + MAYBE_SKIP_TEST; ResetStub(); std::vector threads; threads.reserve(10); @@ -836,6 +862,7 @@ TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { } TEST_P(End2endTest, MultipleRpcs) { + MAYBE_SKIP_TEST; ResetStub(); std::vector threads; threads.reserve(10); @@ -848,6 +875,7 @@ TEST_P(End2endTest, MultipleRpcs) { } TEST_P(End2endTest, ManyStubs) { + MAYBE_SKIP_TEST; ResetStub(); ChannelTestPeer peer(channel_.get()); int registered_calls_pre = peer.registered_calls(); @@ -860,6 +888,7 @@ TEST_P(End2endTest, ManyStubs) { } TEST_P(End2endTest, EmptyBinaryMetadata) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -872,6 +901,7 @@ TEST_P(End2endTest, EmptyBinaryMetadata) { } TEST_P(End2endTest, ReconnectChannel) { + MAYBE_SKIP_TEST; if (GetParam().inproc) { return; } @@ -899,6 +929,7 @@ TEST_P(End2endTest, ReconnectChannel) { } TEST_P(End2endTest, RequestStreamOneRequest) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -915,6 +946,7 @@ TEST_P(End2endTest, RequestStreamOneRequest) { } TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -930,6 +962,7 @@ TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) { } TEST_P(End2endTest, RequestStreamTwoRequests) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -946,6 +979,7 @@ TEST_P(End2endTest, RequestStreamTwoRequests) { } TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -962,6 +996,7 @@ TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) { } TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -978,6 +1013,7 @@ TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { } TEST_P(End2endTest, ResponseStream) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -996,6 +1032,7 @@ TEST_P(End2endTest, ResponseStream) { } TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1017,6 +1054,7 @@ TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { // This was added to prevent regression from issue: // https://github.com/grpc/grpc/issues/11546 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1038,6 +1076,7 @@ TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) { } TEST_P(End2endTest, BidiStream) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1062,6 +1101,7 @@ TEST_P(End2endTest, BidiStream) { } TEST_P(End2endTest, BidiStreamWithCoalescingApi) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1097,6 +1137,7 @@ TEST_P(End2endTest, BidiStreamWithCoalescingApi) { // This was added to prevent regression from issue: // https://github.com/grpc/grpc/issues/11546 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1122,6 +1163,7 @@ TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) { // Talk to the two services with the same name but different package names. // The two stubs are created on the same channel. TEST_P(End2endTest, DiffPackageServices) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1150,6 +1192,7 @@ void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) { } TEST_P(End2endTest, CancelRpcBeforeStart) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1165,6 +1208,7 @@ TEST_P(End2endTest, CancelRpcBeforeStart) { } TEST_P(End2endTest, CancelRpcAfterStart) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1201,6 +1245,7 @@ TEST_P(End2endTest, CancelRpcAfterStart) { // Client cancels request stream after sending two messages TEST_P(End2endTest, ClientCancelsRequestStream) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1224,6 +1269,7 @@ TEST_P(End2endTest, ClientCancelsRequestStream) { // Client cancels server stream after sending some messages TEST_P(End2endTest, ClientCancelsResponseStream) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1259,6 +1305,7 @@ TEST_P(End2endTest, ClientCancelsResponseStream) { // Client cancels bidi stream after sending some messages TEST_P(End2endTest, ClientCancelsBidi) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1294,6 +1341,7 @@ TEST_P(End2endTest, ClientCancelsBidi) { } TEST_P(End2endTest, RpcMaxMessageSize) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1316,6 +1364,7 @@ void ReaderThreadFunc(ClientReaderWriter* stream, // Run a Read and a WritesDone simultaneously. TEST_P(End2endTest, SimultaneousReadWritesDone) { + MAYBE_SKIP_TEST; ResetStub(); ClientContext context; gpr_event ev; @@ -1330,6 +1379,7 @@ TEST_P(End2endTest, SimultaneousReadWritesDone) { } TEST_P(End2endTest, ChannelState) { + MAYBE_SKIP_TEST; if (GetParam().inproc) { return; } @@ -1380,6 +1430,7 @@ TEST_P(End2endTest, ChannelStateTimeout) { // Talking to a non-existing service. TEST_P(End2endTest, NonExistingService) { + MAYBE_SKIP_TEST; ResetChannel(); std::unique_ptr stub; stub = grpc::testing::UnimplementedEchoService::NewStub(channel_); @@ -1397,6 +1448,7 @@ TEST_P(End2endTest, NonExistingService) { // Ask the server to send back a serialized proto in trailer. // This is an example of setting error details. TEST_P(End2endTest, BinaryTrailerTest) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1423,6 +1475,7 @@ TEST_P(End2endTest, BinaryTrailerTest) { } TEST_P(End2endTest, ExpectErrorTest) { + MAYBE_SKIP_TEST; ResetStub(); std::vector expected_status; @@ -1474,11 +1527,13 @@ class ProxyEnd2endTest : public End2endTest { }; TEST_P(ProxyEnd2endTest, SimpleRpc) { + MAYBE_SKIP_TEST; ResetStub(); SendRpc(stub_.get(), 1, false); } TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1489,6 +1544,7 @@ TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) { } TEST_P(ProxyEnd2endTest, MultipleRpcs) { + MAYBE_SKIP_TEST; ResetStub(); std::vector threads; threads.reserve(10); @@ -1502,6 +1558,7 @@ TEST_P(ProxyEnd2endTest, MultipleRpcs) { // Set a 10us deadline and make sure proper error is returned. TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1527,6 +1584,7 @@ TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) { // Set a long but finite deadline. TEST_P(ProxyEnd2endTest, RpcLongDeadline) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1543,6 +1601,7 @@ TEST_P(ProxyEnd2endTest, RpcLongDeadline) { // Ask server to echo back the deadline it sees. TEST_P(ProxyEnd2endTest, EchoDeadline) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1568,6 +1627,7 @@ TEST_P(ProxyEnd2endTest, EchoDeadline) { // Ask server to echo back the deadline it sees. The rpc has no deadline. TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1583,6 +1643,7 @@ TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) { } TEST_P(ProxyEnd2endTest, UnimplementedRpc) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1598,6 +1659,7 @@ TEST_P(ProxyEnd2endTest, UnimplementedRpc) { // Client cancels rpc after 10ms TEST_P(ProxyEnd2endTest, ClientCancelsRpc) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1632,6 +1694,7 @@ TEST_P(ProxyEnd2endTest, ClientCancelsRpc) { // Server cancels rpc after 1ms TEST_P(ProxyEnd2endTest, ServerCancelsRpc) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1646,6 +1709,7 @@ TEST_P(ProxyEnd2endTest, ServerCancelsRpc) { // Make the response larger than the flow control window. TEST_P(ProxyEnd2endTest, HugeResponse) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1663,6 +1727,7 @@ TEST_P(ProxyEnd2endTest, HugeResponse) { } TEST_P(ProxyEnd2endTest, Peer) { + MAYBE_SKIP_TEST; // Peer is not meaningful for inproc if (GetParam().inproc) { return; @@ -1691,6 +1756,7 @@ class SecureEnd2endTest : public End2endTest { }; TEST_P(SecureEnd2endTest, SimpleRpcWithHost) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; @@ -1722,6 +1788,7 @@ bool MetadataContains( } TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) { + MAYBE_SKIP_TEST; auto* processor = new TestAuthMetadataProcessor(true); StartServer(std::shared_ptr(processor)); ResetStub(); @@ -1747,6 +1814,7 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) { } TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) { + MAYBE_SKIP_TEST; auto* processor = new TestAuthMetadataProcessor(true); StartServer(std::shared_ptr(processor)); ResetStub(); @@ -1762,6 +1830,7 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) { } TEST_P(SecureEnd2endTest, SetPerCallCredentials) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1812,6 +1881,7 @@ class CredentialsInterceptorFactory }; TEST_P(SecureEnd2endTest, CallCredentialsInterception) { + MAYBE_SKIP_TEST; if (!GetParam().use_interceptors) { return; } @@ -1841,6 +1911,7 @@ TEST_P(SecureEnd2endTest, CallCredentialsInterception) { } TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) { + MAYBE_SKIP_TEST; if (!GetParam().use_interceptors) { return; } @@ -1875,6 +1946,7 @@ TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) { } TEST_P(SecureEnd2endTest, OverridePerCallCredentials) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1912,6 +1984,7 @@ TEST_P(SecureEnd2endTest, OverridePerCallCredentials) { } TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1932,6 +2005,7 @@ TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) { } TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1951,6 +2025,7 @@ TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) { } TEST_P(SecureEnd2endTest, AuthMetadataPluginWithDeadline) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; request.mutable_param()->set_skip_cancelled_check(true); @@ -1976,6 +2051,7 @@ TEST_P(SecureEnd2endTest, AuthMetadataPluginWithDeadline) { } TEST_P(SecureEnd2endTest, AuthMetadataPluginWithCancel) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; request.mutable_param()->set_skip_cancelled_check(true); @@ -2004,6 +2080,7 @@ TEST_P(SecureEnd2endTest, AuthMetadataPluginWithCancel) { } TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -2027,6 +2104,7 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) { } TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { + MAYBE_SKIP_TEST; auto* processor = new TestAuthMetadataProcessor(false); StartServer(std::shared_ptr(processor)); ResetStub(); @@ -2055,6 +2133,7 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { } TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) { + MAYBE_SKIP_TEST; auto* processor = new TestAuthMetadataProcessor(false); StartServer(std::shared_ptr(processor)); ResetStub(); @@ -2073,6 +2152,7 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) { } TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -2096,6 +2176,7 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) { } TEST_P(SecureEnd2endTest, CompositeCallCreds) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -2128,6 +2209,7 @@ TEST_P(SecureEnd2endTest, CompositeCallCreds) { } TEST_P(SecureEnd2endTest, ClientAuthContext) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -2172,6 +2254,7 @@ class ResourceQuotaEnd2endTest : public End2endTest { }; TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; @@ -2269,5 +2352,6 @@ INSTANTIATE_TEST_SUITE_P( int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); + int ret = RUN_ALL_TESTS(); + return ret; } diff --git a/test/cpp/end2end/message_allocator_end2end_test.cc b/test/cpp/end2end/message_allocator_end2end_test.cc index a21ea41eb63..95bf7f4faa1 100644 --- a/test/cpp/end2end/message_allocator_end2end_test.cc +++ b/test/cpp/end2end/message_allocator_end2end_test.cc @@ -45,6 +45,17 @@ #include "test/core/util/test_config.h" #include "test/cpp/util/test_credentials_provider.h" +// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration +// should be skipped based on a decision made at SetUp time. In particular, any +// callback tests can only be run if the iomgr can run in the background or if +// the transport is in-process. +#define MAYBE_SKIP_TEST \ + do { \ + if (do_not_test_) { \ + return; \ + } \ + } while (0) + namespace grpc { namespace testing { namespace { @@ -106,7 +117,15 @@ void TestScenario::Log() const { class MessageAllocatorEnd2endTestBase : public ::testing::TestWithParam { protected: - MessageAllocatorEnd2endTestBase() { GetParam().Log(); } + MessageAllocatorEnd2endTestBase() { + GetParam().Log(); + // if (GetParam().protocol == Protocol::TCP) { + // if (!grpc_iomgr_run_in_background()) { + // do_not_test_ = true; + // return; + // } + // } + } ~MessageAllocatorEnd2endTestBase() = default; @@ -191,6 +210,7 @@ class MessageAllocatorEnd2endTestBase } } + bool do_not_test_{false}; int picked_port_{0}; std::shared_ptr channel_; std::unique_ptr stub_; @@ -202,6 +222,7 @@ class MessageAllocatorEnd2endTestBase class NullAllocatorTest : public MessageAllocatorEnd2endTestBase {}; TEST_P(NullAllocatorTest, SimpleRpc) { + MAYBE_SKIP_TEST; CreateServer(nullptr); ResetStub(); SendRpcs(1); @@ -257,6 +278,7 @@ class SimpleAllocatorTest : public MessageAllocatorEnd2endTestBase { }; TEST_P(SimpleAllocatorTest, SimpleRpc) { + MAYBE_SKIP_TEST; const int kRpcCount = 10; std::unique_ptr allocator(new SimpleAllocator); CreateServer(allocator.get()); @@ -271,6 +293,7 @@ TEST_P(SimpleAllocatorTest, SimpleRpc) { } TEST_P(SimpleAllocatorTest, RpcWithEarlyFreeRequest) { + MAYBE_SKIP_TEST; const int kRpcCount = 10; std::unique_ptr allocator(new SimpleAllocator); auto mutator = [](experimental::RpcAllocatorState* allocator_state, @@ -295,6 +318,7 @@ TEST_P(SimpleAllocatorTest, RpcWithEarlyFreeRequest) { } TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) { + MAYBE_SKIP_TEST; const int kRpcCount = 10; std::unique_ptr allocator(new SimpleAllocator); std::vector released_requests; @@ -354,6 +378,7 @@ class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase { }; TEST_P(ArenaAllocatorTest, SimpleRpc) { + MAYBE_SKIP_TEST; const int kRpcCount = 10; std::unique_ptr allocator(new ArenaAllocator); CreateServer(allocator.get()); @@ -404,6 +429,10 @@ INSTANTIATE_TEST_SUITE_P(ArenaAllocatorTest, ArenaAllocatorTest, int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); + // The grpc_init is to cover the MAYBE_SKIP_TEST. + grpc_init(); ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; } From 9c5a39c6db06325f7590adb58ad4bde3457ef81c Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 28 Jul 2020 09:55:43 -0700 Subject: [PATCH 4/4] Revert "Merge pull request #23361 from vjpai/em_agnostic_core_callback_cq" This reverts commit a46cb5e86a8ec99fa423be61253c223914cdbf2a, reversing changes made to b5d42e75fb8a7180e40cdd8ed9f6c3a1b95f1f49. --- src/core/lib/iomgr/ev_posix.cc | 2 - src/core/lib/iomgr/iomgr.cc | 14 - src/core/lib/iomgr/iomgr.h | 10 - src/core/lib/surface/completion_queue.cc | 282 +----------------- src/core/lib/surface/completion_queue.h | 8 - src/core/lib/surface/init.cc | 2 - .../end2end/client_callback_end2end_test.cc | 8 +- test/cpp/end2end/end2end_test.cc | 10 +- .../end2end/message_allocator_end2end_test.cc | 12 +- test/cpp/microbenchmarks/bm_cq.cc | 62 +--- 10 files changed, 27 insertions(+), 383 deletions(-) diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index f0cb416447f..3d32359be46 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -37,7 +37,6 @@ #include "src/core/lib/iomgr/ev_epollex_linux.h" #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/iomgr/internal_errqueue.h" -#include "src/core/lib/iomgr/iomgr.h" GPR_GLOBAL_CONFIG_DEFINE_STRING( grpc_poll_strategy, "all", @@ -108,7 +107,6 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) { auto ret = grpc_init_poll_posix(explicit_request); real_poll_function = grpc_poll_function; grpc_poll_function = dummy_poll; - grpc_iomgr_mark_non_polling_internal(); return ret; } diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index d7da7c9151d..802e3bdcb4d 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -50,7 +50,6 @@ static gpr_cv g_rcv; static int g_shutdown; static grpc_iomgr_object g_root_object; static bool g_grpc_abort_on_leaks; -static bool g_iomgr_non_polling; void grpc_iomgr_init() { grpc_core::ExecCtx exec_ctx; @@ -193,16 +192,3 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object* obj) { } bool grpc_iomgr_abort_on_leaks(void) { return g_grpc_abort_on_leaks; } - -bool grpc_iomgr_non_polling() { - gpr_mu_lock(&g_mu); - bool ret = g_iomgr_non_polling; - gpr_mu_unlock(&g_mu); - return ret; -} - -void grpc_iomgr_mark_non_polling_internal() { - gpr_mu_lock(&g_mu); - g_iomgr_non_polling = true; - gpr_mu_unlock(&g_mu); -} diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index cb9f3eb35c8..e02f15e551c 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -45,16 +45,6 @@ void grpc_iomgr_shutdown_background_closure(); */ bool grpc_iomgr_run_in_background(); -/* Returns true if polling engine is non-polling, false otherwise. - * Currently only 'none' is non-polling. - */ -bool grpc_iomgr_non_polling(); - -/* Mark the polling engine as non-polling. For internal use only. - * Currently only 'none' is non-polling. - */ -void grpc_iomgr_mark_non_polling_internal(); - /** Returns true if the caller is a worker thread for any background poller. */ bool grpc_iomgr_is_any_background_poller_thread(); diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index c59e329fabe..a9f65bd5310 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -39,7 +39,6 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gprpp/atomic.h" -#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" @@ -209,9 +208,6 @@ struct cq_vtable { void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); - // TODO(vjpai): Remove proxy_pollset once callback_alternative no longer - // needed. - grpc_pollset* (*proxy_pollset)(grpc_completion_queue* cq); }; namespace { @@ -313,7 +309,7 @@ struct cq_pluck_data { }; struct cq_callback_data { - explicit cq_callback_data( + cq_callback_data( grpc_experimental_completion_queue_functor* shutdown_callback) : shutdown_callback(shutdown_callback) {} @@ -338,81 +334,6 @@ struct cq_callback_data { grpc_experimental_completion_queue_functor* shutdown_callback; }; -// TODO(vjpai): Remove all callback_alternative variants when event manager is -// the only supported poller. -struct cq_callback_alternative_data { - explicit cq_callback_alternative_data( - grpc_experimental_completion_queue_functor* shutdown_callback) - : implementation(SharedNextableCQ()), - shutdown_callback(shutdown_callback) {} - - /* This just points to a single shared nextable CQ */ - grpc_completion_queue* const implementation; - - /** Number of outstanding events (+1 if not shut down) - Initial count is dropped by grpc_completion_queue_shutdown */ - grpc_core::Atomic pending_events{1}; - - /** 0 initially. 1 once we initiated shutdown */ - bool shutdown_called = false; - - /** A callback that gets invoked when the CQ completes shutdown */ - grpc_experimental_completion_queue_functor* shutdown_callback; - - static grpc_completion_queue* SharedNextableCQ() { - grpc_core::MutexLock lock(&*shared_cq_next_mu); - - if (shared_cq_next == nullptr) { - shared_cq_next = grpc_completion_queue_create_for_next(nullptr); - int num_nexting_threads = GPR_CLAMP(gpr_cpu_num_cores(), 2, 32); - threads_remaining.Store(num_nexting_threads, - grpc_core::MemoryOrder::RELEASE); - for (int i = 0; i < num_nexting_threads; i++) { - grpc_core::Executor::Run( - GRPC_CLOSURE_CREATE( - [](void* arg, grpc_error* /*error*/) { - grpc_completion_queue* cq = - static_cast(arg); - while (true) { - grpc_event event = grpc_completion_queue_next( - cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - if (event.type == GRPC_QUEUE_SHUTDOWN) { - break; - } - GPR_DEBUG_ASSERT(event.type == GRPC_OP_COMPLETE); - // We can always execute the callback inline rather than - // pushing it to another Executor thread because this - // thread is definitely running on an executor, does not - // hold any application locks before executing the callback, - // and cannot be entered recursively. - auto* functor = static_cast< - grpc_experimental_completion_queue_functor*>(event.tag); - functor->functor_run(functor, event.success); - } - if (threads_remaining.FetchSub( - 1, grpc_core::MemoryOrder::ACQ_REL) == 1) { - grpc_completion_queue_destroy(cq); - } - }, - shared_cq_next, nullptr), - GRPC_ERROR_NONE, grpc_core::ExecutorType::DEFAULT, - grpc_core::ExecutorJobType::LONG); - } - } - return shared_cq_next; - } - // Use manually-constructed Mutex to avoid static construction issues - static grpc_core::ManualConstructor shared_cq_next_mu; - static grpc_completion_queue* - shared_cq_next; // GUARDED_BY(shared_cq_next_mu) - static grpc_core::Atomic threads_remaining; -}; - -grpc_core::ManualConstructor - cq_callback_alternative_data::shared_cq_next_mu; -grpc_completion_queue* cq_callback_alternative_data::shared_cq_next = nullptr; -grpc_core::Atomic cq_callback_alternative_data::threads_remaining{0}; - } // namespace /* Completion queue structure */ @@ -425,12 +346,6 @@ struct grpc_completion_queue { const cq_vtable* vtable; const cq_poller_vtable* poller_vtable; - // The pollset entry is allowed to enable proxy CQs like the - // callback_alternative. - // TODO(vjpai): Consider removing pollset and reverting to previous - // calculation of pollset once callback_alternative is no longer needed. - grpc_pollset* pollset; - #ifndef NDEBUG void** outstanding_tags; size_t outstanding_tag_count; @@ -445,17 +360,13 @@ struct grpc_completion_queue { static void cq_finish_shutdown_next(grpc_completion_queue* cq); static void cq_finish_shutdown_pluck(grpc_completion_queue* cq); static void cq_finish_shutdown_callback(grpc_completion_queue* cq); -static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq); static void cq_shutdown_next(grpc_completion_queue* cq); static void cq_shutdown_pluck(grpc_completion_queue* cq); static void cq_shutdown_callback(grpc_completion_queue* cq); -static void cq_shutdown_callback_alternative(grpc_completion_queue* cq); static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag); static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); -static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq, - void* tag); // A cq_end_op function is called when an operation on a given CQ with // a given tag has completed. The storage argument is a reference to the @@ -478,20 +389,12 @@ static void cq_end_op_for_callback( void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, bool internal); -static void cq_end_op_for_callback_alternative( - grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal); - static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); -static grpc_pollset* cq_proxy_pollset_for_callback_alternative( - grpc_completion_queue* cq); - // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback static void cq_init_next( void* data, grpc_experimental_completion_queue_functor* shutdown_callback); @@ -499,39 +402,29 @@ static void cq_init_pluck( void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_init_callback( void* data, grpc_experimental_completion_queue_functor* shutdown_callback); -// poller becomes only option. -static void cq_init_callback_alternative( - void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_destroy_next(void* data); static void cq_destroy_pluck(void* data); static void cq_destroy_callback(void* data); -static void cq_destroy_callback_alternative(void* data); /* Completion queue vtables based on the completion-type */ -// TODO(vjpai): Make this const again once we stop needing callback_alternative -static cq_vtable g_polling_cq_vtable[] = { +static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next, cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next, - nullptr, nullptr}, + nullptr}, /* GRPC_CQ_PLUCK */ {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck, cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr, - cq_pluck, nullptr}, + cq_pluck}, /* GRPC_CQ_CALLBACK */ {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback, cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback, - cq_end_op_for_callback, nullptr, nullptr, nullptr}, + cq_end_op_for_callback, nullptr, nullptr}, }; -// Separate vtable for non-polling cqs, assign at init -static cq_vtable g_nonpolling_cq_vtable[sizeof(g_polling_cq_vtable) / - sizeof(g_polling_cq_vtable[0])]; - #define DATA_FROM_CQ(cq) ((void*)(cq + 1)) -#define INLINE_POLLSET_FROM_CQ(cq) \ +#define POLLSET_FROM_CQ(cq) \ ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq))) -#define POLLSET_FROM_CQ(cq) (cq->pollset) grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck"); @@ -550,46 +443,6 @@ static void on_pollset_shutdown_done(void* cq, grpc_error* error); void grpc_cq_global_init() { gpr_tls_init(&g_cached_event); gpr_tls_init(&g_cached_cq); - g_nonpolling_cq_vtable[GRPC_CQ_NEXT] = g_polling_cq_vtable[GRPC_CQ_NEXT]; - g_nonpolling_cq_vtable[GRPC_CQ_PLUCK] = g_polling_cq_vtable[GRPC_CQ_PLUCK]; - g_nonpolling_cq_vtable[GRPC_CQ_CALLBACK] = - g_polling_cq_vtable[GRPC_CQ_CALLBACK]; -} - -// TODO(vjpai): Remove when callback_alternative is no longer needed -void grpc_cq_init() { - // If the iomgr runs in the background, we can use the preferred callback CQ. - // If the iomgr is non-polling, we cannot use the alternative callback CQ. - if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) { - cq_callback_alternative_data::shared_cq_next_mu.Init(); - g_polling_cq_vtable[GRPC_CQ_CALLBACK] = { - GRPC_CQ_CALLBACK, - sizeof(cq_callback_alternative_data), - cq_init_callback_alternative, - cq_shutdown_callback_alternative, - cq_destroy_callback_alternative, - cq_begin_op_for_callback_alternative, - cq_end_op_for_callback_alternative, - nullptr, - nullptr, - cq_proxy_pollset_for_callback_alternative}; - } -} - -// TODO(vjpai): Remove when callback_alternative is no longer needed -void grpc_cq_shutdown() { - if (!grpc_iomgr_run_in_background() && !grpc_iomgr_non_polling()) { - { - grpc_core::MutexLock lock( - &*cq_callback_alternative_data::shared_cq_next_mu); - if (cq_callback_alternative_data::shared_cq_next != nullptr) { - grpc_completion_queue_shutdown( - cq_callback_alternative_data::shared_cq_next); - } - cq_callback_alternative_data::shared_cq_next = nullptr; - } - cq_callback_alternative_data::shared_cq_next_mu.Destroy(); - } } void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) { @@ -668,9 +521,7 @@ grpc_completion_queue* grpc_completion_queue_create_internal( "polling_type=%d)", 2, (completion_type, polling_type)); - const cq_vtable* vtable = (polling_type == GRPC_CQ_NON_POLLING) - ? &g_nonpolling_cq_vtable[completion_type] - : &g_polling_cq_vtable[completion_type]; + const cq_vtable* vtable = &g_cq_vtable[completion_type]; const cq_poller_vtable* poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; @@ -687,18 +538,9 @@ grpc_completion_queue* grpc_completion_queue_create_internal( /* One for destroy(), one for pollset_shutdown */ new (&cq->owning_refs) grpc_core::RefCount(2); + poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); vtable->init(DATA_FROM_CQ(cq), shutdown_callback); - // TODO(vjpai): When callback_alternative is no longer needed, cq->pollset can - // be removed and the nullptr proxy_pollset value below can be the definition - // of POLLSET_FROM_CQ. - cq->pollset = cq->vtable->proxy_pollset == nullptr - ? INLINE_POLLSET_FROM_CQ(cq) - : cq->vtable->proxy_pollset(cq); - // Init the inline pollset. If a proxy CQ is used, the proxy pollset will be - // init'ed in its CQ init. - cq->poller_vtable->init(INLINE_POLLSET_FROM_CQ(cq), &cq->mu); - GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, grpc_schedule_on_exec_ctx); return cq; @@ -736,17 +578,6 @@ static void cq_destroy_callback(void* data) { cqd->~cq_callback_data(); } -static void cq_init_callback_alternative( - void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { - new (data) cq_callback_alternative_data(shutdown_callback); -} - -static void cq_destroy_callback_alternative(void* data) { - cq_callback_alternative_data* cqd = - static_cast(data); - cqd->~cq_callback_alternative_data(); -} - grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) { return cq->vtable->cq_completion_type; } @@ -787,9 +618,7 @@ void grpc_cq_internal_unref(grpc_completion_queue* cq) { #endif if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) { cq->vtable->destroy(DATA_FROM_CQ(cq)); - // Only destroy the inlined pollset. If a proxy CQ is used, the proxy - // pollset will be destroyed by the proxy CQ. - cq->poller_vtable->destroy(INLINE_POLLSET_FROM_CQ(cq)); + cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq)); #ifndef NDEBUG gpr_free(cq->outstanding_tags); #endif @@ -840,14 +669,6 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) { return cqd->pending_events.IncrementIfNonzero(); } -static bool cq_begin_op_for_callback_alternative(grpc_completion_queue* cq, - void* tag) { - cq_callback_alternative_data* cqd = - static_cast DATA_FROM_CQ(cq); - return grpc_cq_begin_op(cqd->implementation, tag) && - cqd->pending_events.IncrementIfNonzero(); -} - bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { #ifndef NDEBUG gpr_mu_lock(cq->mu); @@ -1011,7 +832,7 @@ static void cq_end_op_for_pluck( GRPC_ERROR_UNREF(error); } -void functor_callback(void* arg, grpc_error* error) { +static void functor_callback(void* arg, grpc_error* error) { auto* functor = static_cast(arg); functor->functor_run(functor, error == GRPC_ERROR_NONE); } @@ -1071,40 +892,6 @@ static void cq_end_op_for_callback( GRPC_CLOSURE_CREATE(functor_callback, functor, nullptr), error); } -static void cq_end_op_for_callback_alternative( - grpc_completion_queue* cq, void* tag, grpc_error* error, - void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal) { - GPR_TIMER_SCOPE("cq_end_op_for_callback_alternative", 0); - - cq_callback_alternative_data* cqd = - static_cast DATA_FROM_CQ(cq); - - if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || - (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && - error != GRPC_ERROR_NONE)) { - const char* errmsg = grpc_error_string(error); - GRPC_API_TRACE( - "cq_end_op_for_callback_alternative(cq=%p, tag=%p, error=%s, " - "done=%p, done_arg=%p, storage=%p)", - 6, (cq, tag, errmsg, done, done_arg, storage)); - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) && - error != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); - } - } - - // Pass through the actual work to the internal nextable CQ - grpc_cq_end_op(cqd->implementation, tag, error, done, done_arg, storage, - internal); - - cq_check_tag(cq, tag, true); /* Used in debug builds only */ - - if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { - cq_finish_shutdown_callback_alternative(cq); - } -} - void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, grpc_cq_completion* storage, @@ -1112,13 +899,6 @@ void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); } -static grpc_pollset* cq_proxy_pollset_for_callback_alternative( - grpc_completion_queue* cq) { - cq_callback_alternative_data* cqd = - static_cast(DATA_FROM_CQ(cq)); - return POLLSET_FROM_CQ(cqd->implementation); -} - struct cq_is_finished_arg { gpr_atm last_seen_things_queued_ever; grpc_completion_queue* cq; @@ -1599,21 +1379,6 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GRPC_ERROR_NONE); } -static void cq_finish_shutdown_callback_alternative(grpc_completion_queue* cq) { - cq_callback_alternative_data* cqd = - static_cast DATA_FROM_CQ(cq); - auto* callback = cqd->shutdown_callback; - - GPR_ASSERT(cqd->shutdown_called); - - // Shutdown the non-proxy pollset - cq->poller_vtable->shutdown(INLINE_POLLSET_FROM_CQ(cq), - &cq->pollset_shutdown_done); - grpc_core::Executor::Run( - GRPC_CLOSURE_CREATE(functor_callback, callback, nullptr), - GRPC_ERROR_NONE); -} - static void cq_shutdown_callback(grpc_completion_queue* cq) { cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -1640,33 +1405,6 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); } -static void cq_shutdown_callback_alternative(grpc_completion_queue* cq) { - cq_callback_alternative_data* cqd = - static_cast DATA_FROM_CQ(cq); - - /* Need an extra ref for cq here because: - * We call cq_finish_shutdown_callback() below, which calls pollset shutdown. - * Pollset shutdown decrements the cq ref count which can potentially destroy - * the cq (if that happens to be the last ref). - * Creating an extra ref here prevents the cq from getting destroyed while - * this function is still active */ - GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)"); - gpr_mu_lock(cq->mu); - if (cqd->shutdown_called) { - gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); - return; - } - cqd->shutdown_called = true; - if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) { - gpr_mu_unlock(cq->mu); - cq_finish_shutdown_callback_alternative(cq); - } else { - gpr_mu_unlock(cq->mu); - } - GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); -} - /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 002f7b0728b..4a114be8285 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -69,14 +69,6 @@ void grpc_cq_internal_unref(grpc_completion_queue* cc); /* Initializes global variables used by completion queues */ void grpc_cq_global_init(); -// Completion queue initializations that must be done after iomgr -// TODO(vjpai): Remove when callback_alternative is no longer needed. -void grpc_cq_init(); - -// Completion queue shutdowns that must be done before iomgr shutdown. -// TODO(vjpai): Remove when callback_alternative is no longer needed. -void grpc_cq_shutdown(); - /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. \a tag is currently used only in debug builds. Return true on success, and diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 3bbd7dc53d6..d8bc4e4dd32 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -144,7 +144,6 @@ void grpc_init(void) { grpc_core::ApplicationCallbackExecCtx::GlobalInit(); grpc_core::ExecCtx::GlobalInit(); grpc_iomgr_init(); - grpc_cq_init(); gpr_timers_global_init(); grpc_core::HandshakerRegistry::Init(); grpc_security_init(); @@ -170,7 +169,6 @@ void grpc_shutdown_internal_locked(void) { int i; { grpc_core::ExecCtx exec_ctx(0); - grpc_cq_shutdown(); grpc_iomgr_shutdown_background_closure(); { grpc_timer_manager_set_threading(false); // shutdown timer_manager thread diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 63e726d0cde..5387e5f0af9 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -130,10 +130,10 @@ class ClientCallbackEnd2endTest server_ = builder.BuildAndStart(); is_server_started_ = true; - // if (GetParam().protocol == Protocol::TCP && - // !grpc_iomgr_run_in_background()) { - // do_not_test_ = true; - // } + if (GetParam().protocol == Protocol::TCP && + !grpc_iomgr_run_in_background()) { + do_not_test_ = true; + } } void ResetStub() { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index e6538344b03..f157d20cc6f 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -328,11 +328,11 @@ class End2endTest : public ::testing::TestWithParam { } void SetUp() override { - // if (GetParam().callback_server && !GetParam().inproc && - // !grpc_iomgr_run_in_background()) { - // do_not_test_ = true; - // return; - // } + if (GetParam().callback_server && !GetParam().inproc && + !grpc_iomgr_run_in_background()) { + do_not_test_ = true; + return; + } } void TearDown() override { diff --git a/test/cpp/end2end/message_allocator_end2end_test.cc b/test/cpp/end2end/message_allocator_end2end_test.cc index 95bf7f4faa1..c15066794bc 100644 --- a/test/cpp/end2end/message_allocator_end2end_test.cc +++ b/test/cpp/end2end/message_allocator_end2end_test.cc @@ -119,12 +119,12 @@ class MessageAllocatorEnd2endTestBase protected: MessageAllocatorEnd2endTestBase() { GetParam().Log(); - // if (GetParam().protocol == Protocol::TCP) { - // if (!grpc_iomgr_run_in_background()) { - // do_not_test_ = true; - // return; - // } - // } + if (GetParam().protocol == Protocol::TCP) { + if (!grpc_iomgr_run_in_background()) { + do_not_test_ = true; + return; + } + } } ~MessageAllocatorEnd2endTestBase() = default; diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 1640c4d52f0..c53eb2b9413 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -69,11 +69,6 @@ BENCHMARK(BM_CreateDestroyCore); static void DoneWithCompletionOnStack(void* /*arg*/, grpc_cq_completion* /*completion*/) {} -static void DoneWithCompletionOnHeap(void* /*arg*/, - grpc_cq_completion* completion) { - delete completion; -} - class DummyTag final : public internal::CompletionQueueTag { public: bool FinalizeResult(void** /*tag*/, bool* /*status*/) override { @@ -210,15 +205,8 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { gpr_cv_init(&shutdown_cv); bool got_shutdown = false; ShutdownCallback shutdown_cb(&got_shutdown); - // This test with stack-allocated completions only works for non-polling or - // EM-polling callback core CQs. For generality, test with non-polling. - grpc_completion_queue_attributes attr; - attr.version = 2; - attr.cq_completion_type = GRPC_CQ_CALLBACK; - attr.cq_polling_type = GRPC_CQ_NON_POLLING; - attr.cq_shutdown_cb = &shutdown_cb; - grpc_completion_queue* cc = grpc_completion_queue_create( - grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); + grpc_completion_queue* cc = + grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr); for (auto _ : state) { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; @@ -252,53 +240,7 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { gpr_cv_destroy(&shutdown_cv); gpr_mu_destroy(&shutdown_mu); } -static void BM_Callback_CQ_Pass1CoreHeapCompletion(benchmark::State& state) { - TrackCounters track_counters; - int iteration = 0, current_iterations = 0; - TagCallback tag_cb(&iteration); - gpr_mu_init(&mu); - gpr_cv_init(&cv); - gpr_mu_init(&shutdown_mu); - gpr_cv_init(&shutdown_cv); - bool got_shutdown = false; - ShutdownCallback shutdown_cb(&got_shutdown); - grpc_completion_queue* cc = - grpc_completion_queue_create_for_callback(&shutdown_cb, nullptr); - for (auto _ : state) { - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; - grpc_core::ExecCtx exec_ctx; - grpc_cq_completion* completion = new grpc_cq_completion; - GPR_ASSERT(grpc_cq_begin_op(cc, &tag_cb)); - grpc_cq_end_op(cc, &tag_cb, GRPC_ERROR_NONE, DoneWithCompletionOnHeap, - nullptr, completion); - } - shutdown_and_destroy(cc); - - gpr_mu_lock(&mu); - current_iterations = static_cast(state.iterations()); - while (current_iterations != iteration) { - // Wait for all the callbacks to complete. - gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_unlock(&mu); - - gpr_mu_lock(&shutdown_mu); - while (!got_shutdown) { - // Wait for the shutdown callback to complete. - gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - gpr_mu_unlock(&shutdown_mu); - - GPR_ASSERT(got_shutdown); - GPR_ASSERT(iteration == static_cast(state.iterations())); - track_counters.Finish(state); - gpr_cv_destroy(&cv); - gpr_mu_destroy(&mu); - gpr_cv_destroy(&shutdown_cv); - gpr_mu_destroy(&shutdown_mu); -} BENCHMARK(BM_Callback_CQ_Pass1Core); -BENCHMARK(BM_Callback_CQ_Pass1CoreHeapCompletion); } // namespace testing } // namespace grpc