From a47c979ba07937cd6f5e67b0763a0908502f9b28 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 14 Jan 2019 14:29:27 -0800 Subject: [PATCH] Enable TCP callback tests if the event engine allows --- src/core/lib/iomgr/iomgr.h | 5 ++ src/core/lib/iomgr/iomgr_posix.cc | 4 + src/core/lib/iomgr/iomgr_windows.cc | 2 + .../end2end/client_callback_end2end_test.cc | 66 +++++++++++++- test/cpp/end2end/end2end_test.cc | 86 ++++++++++++++++++- test/cpp/end2end/test_service_impl.cc | 17 +++- 6 files changed, 173 insertions(+), 7 deletions(-) diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 6261aa550c3..74775de8146 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -39,6 +39,11 @@ void grpc_iomgr_shutdown(); * background poller. */ void grpc_iomgr_shutdown_background_closure(); +/* Returns true if polling engine runs in the background, false otherwise. + * Currently only 'epollbg' runs in the background. + */ +bool grpc_iomgr_run_in_background(); + /** Returns true if the caller is a worker thread for any background poller. */ bool grpc_iomgr_is_any_background_poller_thread(); diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index 278c8de6886..690e81f3b1d 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -74,4 +74,8 @@ void grpc_set_default_iomgr_platform() { grpc_set_iomgr_platform_vtable(&vtable); } +bool grpc_iomgr_run_in_background() { + return grpc_event_engine_run_in_background(); +} + #endif /* GRPC_POSIX_SOCKET_IOMGR */ diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc index 0579e16aa76..e517a6caee4 100644 --- a/src/core/lib/iomgr/iomgr_windows.cc +++ b/src/core/lib/iomgr/iomgr_windows.cc @@ -92,4 +92,6 @@ void grpc_set_default_iomgr_platform() { grpc_set_iomgr_platform_vtable(&vtable); } +bool grpc_iomgr_run_in_background() { return false; } + #endif /* GRPC_WINSOCK_SOCKET */ diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index a999321992f..30db5b8c01c 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -30,7 +31,9 @@ #include #include +#include "src/core/lib/iomgr/iomgr.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" #include "test/core/util/test_config.h" #include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/util/byte_buffer_proto_helper.h" @@ -38,15 +41,30 @@ #include +// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration +// should be skipped based on a decision made at SetUp time. In particular, any +// callback tests can only be run if the iomgr can run in the background or if +// the transport is in-process. +#define MAYBE_SKIP_TEST \ + do { \ + if (do_not_test_) { \ + return; \ + } \ + } while (0) + namespace grpc { namespace testing { namespace { +enum class Protocol { INPROC, TCP }; + class TestScenario { public: - TestScenario(bool serve_callback) : callback_server(serve_callback) {} + TestScenario(bool serve_callback, Protocol protocol) + : callback_server(serve_callback), protocol(protocol) {} void Log() const; bool callback_server; + Protocol protocol; }; static std::ostream& operator<<(std::ostream& out, @@ -69,6 +87,16 @@ class ClientCallbackEnd2endTest void SetUp() override { ServerBuilder builder; + if (GetParam().protocol == Protocol::TCP) { + if (!grpc_iomgr_run_in_background()) { + do_not_test_ = true; + return; + } + int port = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port; + builder.AddListeningPort(server_address_.str(), + InsecureServerCredentials()); + } if (!GetParam().callback_server) { builder.RegisterService(&service_); } else { @@ -81,7 +109,17 @@ class ClientCallbackEnd2endTest void ResetStub() { ChannelArguments args; - channel_ = server_->InProcessChannel(args); + switch (GetParam().protocol) { + case Protocol::TCP: + channel_ = + CreateChannel(server_address_.str(), InsecureChannelCredentials()); + break; + case Protocol::INPROC: + channel_ = server_->InProcessChannel(args); + break; + default: + assert(false); + } stub_ = grpc::testing::EchoTestService::NewStub(channel_); generic_stub_.reset(new GenericStub(channel_)); } @@ -243,26 +281,31 @@ class ClientCallbackEnd2endTest rpc.Await(); } } - bool is_server_started_; + bool do_not_test_{false}; + bool is_server_started_{false}; std::shared_ptr channel_; std::unique_ptr stub_; std::unique_ptr generic_stub_; TestServiceImpl service_; CallbackTestServiceImpl callback_service_; std::unique_ptr server_; + std::ostringstream server_address_; }; TEST_P(ClientCallbackEnd2endTest, SimpleRpc) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcs(1, false); } TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcs(10, false); } TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) { + MAYBE_SKIP_TEST; ResetStub(); SimpleRequest request; SimpleResponse response; @@ -289,38 +332,45 @@ TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) { } TEST_P(ClientCallbackEnd2endTest, SimpleRpcWithBinaryMetadata) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcs(1, true); } TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcs(10, true); } TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcsGeneric(10, false); } TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) { + MAYBE_SKIP_TEST; ResetStub(); SendGenericEchoAsBidi(10, 1); } TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) { + MAYBE_SKIP_TEST; ResetStub(); SendGenericEchoAsBidi(10, 10); } #if GRPC_ALLOW_EXCEPTIONS TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) { + MAYBE_SKIP_TEST; ResetStub(); SendRpcsGeneric(10, true); } #endif TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { + MAYBE_SKIP_TEST; ResetStub(); std::vector threads; threads.reserve(10); @@ -333,6 +383,7 @@ TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { } TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) { + MAYBE_SKIP_TEST; ResetStub(); std::vector threads; threads.reserve(10); @@ -345,6 +396,7 @@ TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) { } TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -370,6 +422,7 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { } TEST_P(ClientCallbackEnd2endTest, RequestStream) { + MAYBE_SKIP_TEST; ResetStub(); class Client : public grpc::experimental::ClientWriteReactor { public: @@ -416,6 +469,7 @@ TEST_P(ClientCallbackEnd2endTest, RequestStream) { } TEST_P(ClientCallbackEnd2endTest, ResponseStream) { + MAYBE_SKIP_TEST; ResetStub(); class Client : public grpc::experimental::ClientReadReactor { public: @@ -463,6 +517,7 @@ TEST_P(ClientCallbackEnd2endTest, ResponseStream) { } TEST_P(ClientCallbackEnd2endTest, BidiStream) { + MAYBE_SKIP_TEST; ResetStub(); class Client : public grpc::experimental::ClientBidiReactor { @@ -519,7 +574,10 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) { test.Await(); } -TestScenario scenarios[] = {TestScenario{false}, TestScenario{true}}; +TestScenario scenarios[]{{false, Protocol::INPROC}, + {false, Protocol::TCP}, + {true, Protocol::INPROC}, + {true, Protocol::TCP}}; INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, ::testing::ValuesIn(scenarios)); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 4bddbb4bdf2..f58a472bfaf 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -35,6 +35,7 @@ #include #include "src/core/lib/gpr/env.h" +#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -52,6 +53,17 @@ using grpc::testing::EchoResponse; using grpc::testing::kTlsCredentialsType; using std::chrono::system_clock; +// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration +// should be skipped based on a decision made at SetUp time. In particular, +// tests that use the callback server can only be run if the iomgr can run in +// the background or if the transport is in-process. +#define MAYBE_SKIP_TEST \ + do { \ + if (do_not_test_) { \ + return; \ + } \ + } while (0) + namespace grpc { namespace testing { namespace { @@ -237,6 +249,14 @@ class End2endTest : public ::testing::TestWithParam { GetParam().Log(); } + void SetUp() override { + if (GetParam().callback_server && !GetParam().inproc && + !grpc_iomgr_run_in_background()) { + do_not_test_ = true; + return; + } + } + void TearDown() override { if (is_server_started_) { server_->Shutdown(); @@ -361,6 +381,7 @@ class End2endTest : public ::testing::TestWithParam { DummyInterceptor::Reset(); } + bool do_not_test_{false}; bool is_server_started_; std::shared_ptr channel_; std::unique_ptr stub_; @@ -416,6 +437,7 @@ class End2endServerTryCancelTest : public End2endTest { // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestRequestStreamServerCancel( ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) { + MAYBE_SKIP_TEST; RestartServer(std::shared_ptr()); ResetStub(); EchoRequest request; @@ -494,6 +516,7 @@ class End2endServerTryCancelTest : public End2endTest { // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestResponseStreamServerCancel( ServerTryCancelRequestPhase server_try_cancel) { + MAYBE_SKIP_TEST; RestartServer(std::shared_ptr()); ResetStub(); EchoRequest request; @@ -575,6 +598,7 @@ class End2endServerTryCancelTest : public End2endTest { // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel, int num_messages) { + MAYBE_SKIP_TEST; RestartServer(std::shared_ptr()); ResetStub(); EchoRequest request; @@ -650,6 +674,7 @@ class End2endServerTryCancelTest : public End2endTest { }; TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -712,6 +737,7 @@ TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) { } TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) { + MAYBE_SKIP_TEST; // User-Agent is an HTTP header for HTTP transports only if (GetParam().inproc) { return; @@ -735,6 +761,7 @@ TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) { } TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { + MAYBE_SKIP_TEST; ResetStub(); std::vector threads; threads.reserve(10); @@ -747,6 +774,7 @@ TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { } TEST_P(End2endTest, MultipleRpcs) { + MAYBE_SKIP_TEST; ResetStub(); std::vector threads; threads.reserve(10); @@ -759,6 +787,7 @@ TEST_P(End2endTest, MultipleRpcs) { } TEST_P(End2endTest, EmptyBinaryMetadata) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -771,6 +800,7 @@ TEST_P(End2endTest, EmptyBinaryMetadata) { } TEST_P(End2endTest, ReconnectChannel) { + MAYBE_SKIP_TEST; if (GetParam().inproc) { return; } @@ -796,6 +826,7 @@ TEST_P(End2endTest, ReconnectChannel) { } TEST_P(End2endTest, RequestStreamOneRequest) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -812,6 +843,7 @@ TEST_P(End2endTest, RequestStreamOneRequest) { } TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -827,6 +859,7 @@ TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) { } TEST_P(End2endTest, RequestStreamTwoRequests) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -843,6 +876,7 @@ TEST_P(End2endTest, RequestStreamTwoRequests) { } TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -859,6 +893,7 @@ TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) { } TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -875,6 +910,7 @@ TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { } TEST_P(End2endTest, ResponseStream) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -893,6 +929,7 @@ TEST_P(End2endTest, ResponseStream) { } TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -914,6 +951,7 @@ TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { // This was added to prevent regression from issue: // https://github.com/grpc/grpc/issues/11546 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -935,6 +973,7 @@ TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) { } TEST_P(End2endTest, BidiStream) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -959,6 +998,7 @@ TEST_P(End2endTest, BidiStream) { } TEST_P(End2endTest, BidiStreamWithCoalescingApi) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -994,6 +1034,7 @@ TEST_P(End2endTest, BidiStreamWithCoalescingApi) { // This was added to prevent regression from issue: // https://github.com/grpc/grpc/issues/11546 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1019,6 +1060,7 @@ TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) { // Talk to the two services with the same name but different package names. // The two stubs are created on the same channel. TEST_P(End2endTest, DiffPackageServices) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1047,6 +1089,7 @@ void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) { } TEST_P(End2endTest, CancelRpcBeforeStart) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1063,6 +1106,7 @@ TEST_P(End2endTest, CancelRpcBeforeStart) { // Client cancels request stream after sending two messages TEST_P(End2endTest, ClientCancelsRequestStream) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1086,6 +1130,7 @@ TEST_P(End2endTest, ClientCancelsRequestStream) { // Client cancels server stream after sending some messages TEST_P(End2endTest, ClientCancelsResponseStream) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1121,6 +1166,7 @@ TEST_P(End2endTest, ClientCancelsResponseStream) { // Client cancels bidi stream after sending some messages TEST_P(End2endTest, ClientCancelsBidi) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1156,6 +1202,7 @@ TEST_P(End2endTest, ClientCancelsBidi) { } TEST_P(End2endTest, RpcMaxMessageSize) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1178,6 +1225,7 @@ void ReaderThreadFunc(ClientReaderWriter* stream, // Run a Read and a WritesDone simultaneously. TEST_P(End2endTest, SimultaneousReadWritesDone) { + MAYBE_SKIP_TEST; ResetStub(); ClientContext context; gpr_event ev; @@ -1192,6 +1240,7 @@ TEST_P(End2endTest, SimultaneousReadWritesDone) { } TEST_P(End2endTest, ChannelState) { + MAYBE_SKIP_TEST; if (GetParam().inproc) { return; } @@ -1242,6 +1291,7 @@ TEST_P(End2endTest, ChannelStateTimeout) { // Talking to a non-existing service. TEST_P(End2endTest, NonExistingService) { + MAYBE_SKIP_TEST; ResetChannel(); std::unique_ptr stub; stub = grpc::testing::UnimplementedEchoService::NewStub(channel_); @@ -1259,6 +1309,7 @@ TEST_P(End2endTest, NonExistingService) { // Ask the server to send back a serialized proto in trailer. // This is an example of setting error details. TEST_P(End2endTest, BinaryTrailerTest) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1285,6 +1336,7 @@ TEST_P(End2endTest, BinaryTrailerTest) { } TEST_P(End2endTest, ExpectErrorTest) { + MAYBE_SKIP_TEST; ResetStub(); std::vector expected_status; @@ -1336,11 +1388,13 @@ class ProxyEnd2endTest : public End2endTest { }; TEST_P(ProxyEnd2endTest, SimpleRpc) { + MAYBE_SKIP_TEST; ResetStub(); SendRpc(stub_.get(), 1, false); } TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1351,6 +1405,7 @@ TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) { } TEST_P(ProxyEnd2endTest, MultipleRpcs) { + MAYBE_SKIP_TEST; ResetStub(); std::vector threads; threads.reserve(10); @@ -1364,6 +1419,7 @@ TEST_P(ProxyEnd2endTest, MultipleRpcs) { // Set a 10us deadline and make sure proper error is returned. TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1389,6 +1445,7 @@ TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) { // Set a long but finite deadline. TEST_P(ProxyEnd2endTest, RpcLongDeadline) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1405,6 +1462,7 @@ TEST_P(ProxyEnd2endTest, RpcLongDeadline) { // Ask server to echo back the deadline it sees. TEST_P(ProxyEnd2endTest, EchoDeadline) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1430,6 +1488,7 @@ TEST_P(ProxyEnd2endTest, EchoDeadline) { // Ask server to echo back the deadline it sees. The rpc has no deadline. TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1445,6 +1504,7 @@ TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) { } TEST_P(ProxyEnd2endTest, UnimplementedRpc) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1460,6 +1520,7 @@ TEST_P(ProxyEnd2endTest, UnimplementedRpc) { // Client cancels rpc after 10ms TEST_P(ProxyEnd2endTest, ClientCancelsRpc) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1494,6 +1555,7 @@ TEST_P(ProxyEnd2endTest, ClientCancelsRpc) { // Server cancels rpc after 1ms TEST_P(ProxyEnd2endTest, ServerCancelsRpc) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1508,6 +1570,7 @@ TEST_P(ProxyEnd2endTest, ServerCancelsRpc) { // Make the response larger than the flow control window. TEST_P(ProxyEnd2endTest, HugeResponse) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1525,6 +1588,7 @@ TEST_P(ProxyEnd2endTest, HugeResponse) { } TEST_P(ProxyEnd2endTest, Peer) { + MAYBE_SKIP_TEST; // Peer is not meaningful for inproc if (GetParam().inproc) { return; @@ -1553,6 +1617,7 @@ class SecureEnd2endTest : public End2endTest { }; TEST_P(SecureEnd2endTest, SimpleRpcWithHost) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; @@ -1584,6 +1649,7 @@ bool MetadataContains( } TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) { + MAYBE_SKIP_TEST; auto* processor = new TestAuthMetadataProcessor(true); StartServer(std::shared_ptr(processor)); ResetStub(); @@ -1609,6 +1675,7 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) { } TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) { + MAYBE_SKIP_TEST; auto* processor = new TestAuthMetadataProcessor(true); StartServer(std::shared_ptr(processor)); ResetStub(); @@ -1624,6 +1691,7 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) { } TEST_P(SecureEnd2endTest, SetPerCallCredentials) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1646,6 +1714,7 @@ TEST_P(SecureEnd2endTest, SetPerCallCredentials) { } TEST_P(SecureEnd2endTest, OverridePerCallCredentials) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1677,6 +1746,7 @@ TEST_P(SecureEnd2endTest, OverridePerCallCredentials) { } TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1694,6 +1764,7 @@ TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) { } TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1711,6 +1782,7 @@ TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) { } TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1732,6 +1804,7 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) { } TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { + MAYBE_SKIP_TEST; auto* processor = new TestAuthMetadataProcessor(false); StartServer(std::shared_ptr(processor)); ResetStub(); @@ -1757,6 +1830,7 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { } TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) { + MAYBE_SKIP_TEST; auto* processor = new TestAuthMetadataProcessor(false); StartServer(std::shared_ptr(processor)); ResetStub(); @@ -1772,6 +1846,7 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) { } TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1793,6 +1868,7 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) { } TEST_P(SecureEnd2endTest, CompositeCallCreds) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1821,6 +1897,7 @@ TEST_P(SecureEnd2endTest, CompositeCallCreds) { } TEST_P(SecureEnd2endTest, ClientAuthContext) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; EchoResponse response; @@ -1865,6 +1942,7 @@ class ResourceQuotaEnd2endTest : public End2endTest { }; TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) { + MAYBE_SKIP_TEST; ResetStub(); EchoRequest request; @@ -1899,11 +1977,17 @@ std::vector CreateTestScenarios(bool use_proxy, credentials_types.push_back(kInsecureCredentialsType); } - // For now test callback server only with inproc + // Test callback with inproc or if the event-engine allows it GPR_ASSERT(!credentials_types.empty()); for (const auto& cred : credentials_types) { scenarios.emplace_back(false, false, false, cred, false); scenarios.emplace_back(true, false, false, cred, false); + if (test_callback_server) { + // Note that these scenarios will be dynamically disabled if the event + // engine doesn't run in the background + scenarios.emplace_back(false, false, false, cred, true); + scenarios.emplace_back(true, false, false, cred, true); + } if (use_proxy) { scenarios.emplace_back(false, true, false, cred, false); scenarios.emplace_back(true, true, false, cred, false); diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 6729ad14f4a..8c2df1acc33 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -670,6 +670,8 @@ CallbackTestServiceImpl::ResponseStream() { void OnWriteDone(bool ok) override { if (num_msgs_sent_ < server_responses_to_send_) { NextWrite(); + } else if (server_coalescing_api_ != 0) { + // We would have already done Finish just after the WriteLast } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { // Let OnCancel recover this } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) { @@ -695,6 +697,8 @@ CallbackTestServiceImpl::ResponseStream() { server_coalescing_api_ != 0) { num_msgs_sent_++; StartWriteLast(&response_, WriteOptions()); + // If we use WriteLast, we shouldn't wait before attempting Finish + FinishOnce(Status::OK); } else { num_msgs_sent_++; StartWrite(&response_); @@ -753,10 +757,14 @@ CallbackTestServiceImpl::BidiStream() { response_.set_message(request_.message()); if (num_msgs_read_ == server_write_last_) { StartWriteLast(&response_, WriteOptions()); + // If we use WriteLast, we shouldn't wait before attempting Finish } else { StartWrite(&response_); + return; } - } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + } + + if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { // Let OnCancel handle this } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) { ServerTryCancelNonblocking(ctx_); @@ -764,7 +772,12 @@ CallbackTestServiceImpl::BidiStream() { FinishOnce(Status::OK); } } - void OnWriteDone(bool ok) override { StartRead(&request_); } + void OnWriteDone(bool ok) override { + std::lock_guard l(finish_mu_); + if (!finished_) { + StartRead(&request_); + } + } private: void FinishOnce(const Status& s) {