diff --git a/CMakeLists.txt b/CMakeLists.txt index d5f54cb442d..ce8a7e338c0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -815,6 +815,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx stats_test) add_dependencies(buildtests_cxx status_metadata_test) add_dependencies(buildtests_cxx status_util_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx stranded_event_test) + endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx streaming_throughput_test) endif() @@ -13992,6 +13995,48 @@ target_link_libraries(status_util_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + + add_executable(stranded_event_test + test/core/end2end/cq_verifier.cc + test/core/iomgr/stranded_event_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + + target_include_directories(stranded_event_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(stranded_event_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc + gpr + address_sorting + upb + ${_gRPC_GFLAGS_LIBRARIES} + ) + + +endif() endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/Makefile b/Makefile index f2da71bf980..b86df824e22 100644 --- a/Makefile +++ b/Makefile @@ -1280,6 +1280,7 @@ static_metadata_test: $(BINDIR)/$(CONFIG)/static_metadata_test stats_test: $(BINDIR)/$(CONFIG)/stats_test status_metadata_test: $(BINDIR)/$(CONFIG)/status_metadata_test status_util_test: $(BINDIR)/$(CONFIG)/status_util_test +stranded_event_test: $(BINDIR)/$(CONFIG)/stranded_event_test streaming_throughput_test: $(BINDIR)/$(CONFIG)/streaming_throughput_test string_ref_test: $(BINDIR)/$(CONFIG)/string_ref_test test_cpp_client_credentials_test: $(BINDIR)/$(CONFIG)/test_cpp_client_credentials_test @@ -1640,6 +1641,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/stats_test \ $(BINDIR)/$(CONFIG)/status_metadata_test \ $(BINDIR)/$(CONFIG)/status_util_test \ + $(BINDIR)/$(CONFIG)/stranded_event_test \ $(BINDIR)/$(CONFIG)/streaming_throughput_test \ $(BINDIR)/$(CONFIG)/string_ref_test \ $(BINDIR)/$(CONFIG)/test_cpp_client_credentials_test \ @@ -1798,6 +1800,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/stats_test \ $(BINDIR)/$(CONFIG)/status_metadata_test \ $(BINDIR)/$(CONFIG)/status_util_test \ + $(BINDIR)/$(CONFIG)/stranded_event_test \ $(BINDIR)/$(CONFIG)/streaming_throughput_test \ $(BINDIR)/$(CONFIG)/string_ref_test \ $(BINDIR)/$(CONFIG)/test_cpp_client_credentials_test \ @@ -2323,6 +2326,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/status_metadata_test || ( echo test status_metadata_test failed ; exit 1 ) $(E) "[RUN] Testing status_util_test" $(Q) $(BINDIR)/$(CONFIG)/status_util_test || ( echo test status_util_test failed ; exit 1 ) + $(E) "[RUN] Testing stranded_event_test" + $(Q) $(BINDIR)/$(CONFIG)/stranded_event_test || ( echo test stranded_event_test failed ; exit 1 ) $(E) "[RUN] Testing streaming_throughput_test" $(Q) $(BINDIR)/$(CONFIG)/streaming_throughput_test || ( echo test streaming_throughput_test failed ; exit 1 ) $(E) "[RUN] Testing string_ref_test" @@ -17920,6 +17925,52 @@ endif endif +STRANDED_EVENT_TEST_SRC = \ + test/core/end2end/cq_verifier.cc \ + test/core/iomgr/stranded_event_test.cc \ + +STRANDED_EVENT_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(STRANDED_EVENT_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/stranded_event_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.12.0+. + +$(BINDIR)/$(CONFIG)/stranded_event_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/stranded_event_test: $(PROTOBUF_DEP) $(STRANDED_EVENT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(STRANDED_EVENT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/stranded_event_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/core/end2end/cq_verifier.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +$(OBJDIR)/$(CONFIG)/test/core/iomgr/stranded_event_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +deps_stranded_event_test: $(STRANDED_EVENT_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(STRANDED_EVENT_TEST_OBJS:.o=.dep) +endif +endif + + STREAMING_THROUGHPUT_TEST_SRC = \ $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc \ $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 0a8df78a8ca..23a3e3c0554 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -7223,6 +7223,25 @@ targets: - address_sorting - upb uses_polling: false +- name: stranded_event_test + gtest: true + build: test + language: c++ + headers: + - test/core/end2end/cq_verifier.h + src: + - test/core/end2end/cq_verifier.cc + - test/core/iomgr/stranded_event_test.cc + deps: + - grpc_test_util + - grpc + - gpr + - address_sorting + - upb + platforms: + - linux + - posix + - mac - name: streaming_throughput_test gtest: true build: test diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 574977d2cc9..055251f7525 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -67,10 +67,6 @@ //#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1 #define MAX_EPOLL_EVENTS 100 -// TODO(juanlishen): We use a greater-than-one value here as a workaround fix to -// a keepalive ping timeout issue. We may want to revert https://github -// .com/grpc/grpc/pull/14943 once we figure out the root cause. -#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 #define MAX_FDS_IN_CACHE 32 grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, @@ -873,8 +869,6 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, (pollable_obj->event_count - pollable_obj->event_cursor) / worker_count; if (handle_count == 0) { handle_count = 1; - } else if (handle_count > MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) { - handle_count = MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL; } grpc_error* error = GRPC_ERROR_NONE; for (int i = 0; (drain || i < handle_count) && diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD index f7c8722bcdc..ed9e51cf432 100644 --- a/test/core/iomgr/BUILD +++ b/test/core/iomgr/BUILD @@ -366,3 +366,24 @@ grpc_cc_test( "//test/core/util:grpc_test_util", ], ) + +grpc_cc_test( + name = "stranded_event_test", + srcs = ["stranded_event_test.cc"], + external_deps = [ + "gtest", + ], + language = "C++", + tags = [ + # TODO(apolcyn): This test is failing on Windows at entry, enable once passing. + # See e.g. https://source.cloud.google.com/results/invocations/03e2c2bc-1742-48b4-a33d-b4cdaee5c8f9/targets + # E0717 23:43:56.391000000 5488 src/core/lib/surface/server.cc:1630] assertion failed: server->listeners_destroyed == server->listeners.size() + "no_windows", + ], + deps = [ + "//:gpr", + "//:grpc", + "//test/core/end2end:cq_verifier", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/iomgr/stranded_event_test.cc b/test/core/iomgr/stranded_event_test.cc new file mode 100644 index 00000000000..c950f107806 --- /dev/null +++ b/test/core/iomgr/stranded_event_test.cc @@ -0,0 +1,431 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" +#include "absl/types/optional.h" + +#include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/security/credentials/alts/alts_credentials.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/security_connector/alts/alts_security_connector.h" +#include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/uri/uri_parser.h" + +#include "test/core/util/memory_counters.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +#include "test/core/end2end/cq_verifier.h" + +namespace { + +const int kNumMessagePingPongsPerCall = 4000; + +struct TestCall { + explicit TestCall(grpc_channel* channel, grpc_call* call, + grpc_completion_queue* cq) + : channel(channel), call(call), cq(cq) {} + + TestCall(const TestCall& other) = delete; + TestCall& operator=(const TestCall& other) = delete; + + ~TestCall() { + grpc_call_cancel(call, nullptr); + grpc_call_unref(call); + grpc_channel_destroy(channel); + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_completion_queue_destroy(cq); + } + + grpc_channel* channel; + grpc_call* call; + grpc_completion_queue* cq; + absl::optional + status; // filled in when the call is finished +}; + +void StartCall(TestCall* test_call) { + grpc_op ops[6]; + grpc_op* op; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY; + op->reserved = nullptr; + op++; + void* tag = test_call; + grpc_call_error error = grpc_call_start_batch( + test_call->call, ops, static_cast(op - ops), tag, nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + cq_verifier* cqv = cq_verifier_create(test_call->cq); + CQ_EXPECT_COMPLETION(cqv, tag, 1); + cq_verify(cqv); + cq_verifier_destroy(cqv); +} + +void SendMessage(grpc_call* call, grpc_completion_queue* cq) { + grpc_slice request_payload_slice = grpc_slice_from_copied_string("a"); + grpc_byte_buffer* request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_op ops[6]; + grpc_op* op; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op->reserved = nullptr; + op++; + void* tag = call; + grpc_call_error error = grpc_call_start_batch( + call, ops, static_cast(op - ops), tag, nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + cq_verifier* cqv = cq_verifier_create(cq); + CQ_EXPECT_COMPLETION(cqv, tag, 1); + cq_verify(cqv); + cq_verifier_destroy(cqv); + grpc_byte_buffer_destroy(request_payload); +} + +void ReceiveMessage(grpc_call* call, grpc_completion_queue* cq) { + grpc_byte_buffer* request_payload = nullptr; + grpc_op ops[6]; + grpc_op* op; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload; + op->reserved = nullptr; + op++; + void* tag = call; + grpc_call_error error = grpc_call_start_batch( + call, ops, static_cast(op - ops), tag, nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + cq_verifier* cqv = cq_verifier_create(cq); + CQ_EXPECT_COMPLETION(cqv, tag, 1); + cq_verify(cqv); + cq_verifier_destroy(cqv); + grpc_byte_buffer_destroy(request_payload); +} + +void ReceiveInitialMetadata(TestCall* test_call, gpr_timespec deadline) { + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array_init(&initial_metadata_recv); + grpc_op ops[6]; + grpc_op* op; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op->reserved = nullptr; + op++; + void* tag = test_call; + grpc_call_error error = grpc_call_start_batch( + test_call->call, ops, static_cast(op - ops), tag, nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + grpc_event event = + grpc_completion_queue_next(test_call->cq, deadline, nullptr); + if (event.type != GRPC_OP_COMPLETE || !event.success) { + gpr_log(GPR_ERROR, + "Wanted op complete with success, got op type:%d success:%d", + event.type, event.success); + GPR_ASSERT(0); + } + GPR_ASSERT(event.tag == tag); + grpc_metadata_array_destroy(&initial_metadata_recv); +} + +void FinishCall(TestCall* test_call) { + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array trailing_metadata_recv; + grpc_status_code status; + grpc_slice details; + grpc_metadata_array_init(&trailing_metadata_recv); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = nullptr; + op++; + void* tag = test_call; + grpc_call_error error = grpc_call_start_batch( + test_call->call, ops, static_cast(op - ops), tag, nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + grpc_event event = grpc_completion_queue_next( + test_call->cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + GPR_ASSERT(event.type == GRPC_OP_COMPLETE); + GPR_ASSERT(event.success); + GPR_ASSERT(event.tag == tag); + test_call->status = status; + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_slice_unref(details); +} + +class TestServer { + public: + explicit TestServer() { + cq_ = grpc_completion_queue_create_for_next(nullptr); + server_ = grpc_server_create(nullptr, nullptr); + address_ = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + grpc_server_register_completion_queue(server_, cq_, nullptr); + GPR_ASSERT(grpc_server_add_insecure_http2_port(server_, address_.c_str())); + grpc_server_start(server_); + thread_ = std::thread(std::bind(&TestServer::AcceptThread, this)); + } + + ~TestServer() { + grpc_server_shutdown_and_notify(server_, cq_, nullptr); + thread_.join(); + grpc_server_destroy(server_); + grpc_completion_queue_shutdown(cq_); + while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) + ; + grpc_completion_queue_destroy(cq_); + } + + std::string address() const { return address_; } + + private: + void AcceptThread() { + grpc_call_details call_details; + grpc_call_details_init(&call_details); + grpc_metadata_array request_metadata_recv; + grpc_metadata_array_init(&request_metadata_recv); + void* tag = this; + grpc_call* call; + grpc_call_error error = grpc_server_request_call( + server_, &call, &call_details, &request_metadata_recv, cq_, cq_, tag); + GPR_ASSERT(error == GRPC_CALL_OK); + grpc_event event = grpc_completion_queue_next( + cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + GPR_ASSERT(event.type == GRPC_OP_COMPLETE); + GPR_ASSERT(event.success); + GPR_ASSERT(event.tag == tag); + grpc_op ops[6]; + grpc_op* op; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(call, ops, static_cast(op - ops), tag, + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + event = grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr); + GPR_ASSERT(event.type == GRPC_OP_COMPLETE); + GPR_ASSERT(event.success); + GPR_ASSERT(event.tag == tag); + for (int i = 0; i < kNumMessagePingPongsPerCall; i++) { + ReceiveMessage(call, cq_); + SendMessage(call, cq_); + } + grpc_call_cancel_with_status(call, GRPC_STATUS_PERMISSION_DENIED, + "test status", nullptr); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_call_unref(call); + } + + grpc_server* server_; + grpc_completion_queue* cq_; + std::string address_; + std::thread thread_; +}; + +grpc_core::Resolver::Result BuildResolverResponse( + const std::vector& addresses) { + grpc_core::Resolver::Result result; + for (const auto& address_str : addresses) { + grpc_uri* uri = grpc_uri_parse(address_str.c_str(), true); + if (uri == nullptr) { + gpr_log(GPR_ERROR, "Failed to parse uri:%s", address_str.c_str()); + GPR_ASSERT(0); + } + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(uri, &address)); + result.addresses.emplace_back(address.addr, address.len, nullptr); + grpc_uri_destroy(uri); + } + return result; +} + +// Perform a simple RPC where the server cancels the request with +// grpc_call_cancel_with_status +TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) { + gpr_log(GPR_DEBUG, "test thread"); + /* 64 is a somewhat arbitary number, the important thing is that it + * exceeds the value of MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL (16), which + * is enough to repro a bug at time of writing. */ + const int kNumCalls = 64; + size_t ping_pong_round = 0; + size_t ping_pongs_done = 0; + grpc_core::Mutex ping_pong_round_mu; + grpc_core::CondVar ping_pong_round_cv; + const std::string kSharedUnconnectableAddress = + grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die()); + gpr_log(GPR_DEBUG, "created unconnectable address:%s", + kSharedUnconnectableAddress.c_str()); + std::vector threads; + threads.reserve(kNumCalls); + for (int i = 0; i < kNumCalls; i++) { + threads.push_back(std::thread([kSharedUnconnectableAddress, + &ping_pong_round, &ping_pongs_done, + &ping_pong_round_mu, &ping_pong_round_cv]() { + auto test_server = absl::make_unique(); + gpr_log(GPR_DEBUG, "created test_server with address:%s", + test_server->address().c_str()); + std::vector args; + grpc_arg service_config_arg; + service_config_arg.type = GRPC_ARG_STRING; + service_config_arg.key = const_cast(GRPC_ARG_SERVICE_CONFIG); + service_config_arg.value.string = + const_cast("{\"loadBalancingConfig\":[{\"round_robin\":{}}]}"); + args.push_back(service_config_arg); + auto fake_resolver_response_generator = + grpc_core::MakeRefCounted(); + { + grpc_core::ExecCtx exec_ctx; + fake_resolver_response_generator->SetResponse(BuildResolverResponse( + {absl::StrCat("ipv4:", kSharedUnconnectableAddress), + absl::StrCat("ipv4:", test_server->address())})); + } + args.push_back(grpc_core::FakeResolverResponseGenerator::MakeChannelArg( + fake_resolver_response_generator.get())); + grpc_channel_args* channel_args = + grpc_channel_args_copy_and_add(nullptr, args.data(), args.size()); + grpc_channel* channel = grpc_insecure_channel_create( + "fake:///test.server.com", channel_args, nullptr); + grpc_channel_args_destroy(channel_args); + grpc_completion_queue* cq = + grpc_completion_queue_create_for_next(nullptr); + grpc_call* call = grpc_channel_create_call( + channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, + grpc_slice_from_static_string("/foo"), nullptr, + gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + auto test_call = absl::make_unique(channel, call, cq); + // Start a call, and ensure that round_robin load balancing is configured + StartCall(test_call.get()); + // Make sure the test is doing what it's meant to be doing + grpc_channel_info channel_info; + memset(&channel_info, 0, sizeof(channel_info)); + char* lb_policy_name = nullptr; + channel_info.lb_policy_name = &lb_policy_name; + grpc_channel_get_info(channel, &channel_info); + EXPECT_EQ(std::string(lb_policy_name), "round_robin") + << "not using round robin; this test has a low chance of hitting the " + "bug that it's meant to try to hit"; + gpr_free(lb_policy_name); + // Receive initial metadata + gpr_log(GPR_DEBUG, + "now receive initial metadata on call with server address:%s", + test_server->address().c_str()); + ReceiveInitialMetadata(test_call.get(), + grpc_timeout_seconds_to_deadline(30)); + for (int i = 1; i <= kNumMessagePingPongsPerCall; i++) { + { + grpc_core::MutexLock lock(&ping_pong_round_mu); + ping_pong_round_cv.Broadcast(); + while (ping_pong_round != i) { + ping_pong_round_cv.Wait(&ping_pong_round_mu); + } + } + SendMessage(test_call->call, test_call->cq); + ReceiveMessage(test_call->call, test_call->cq); + { + grpc_core::MutexLock lock(&ping_pong_round_mu); + ping_pongs_done++; + ping_pong_round_cv.Broadcast(); + } + } + gpr_log(GPR_DEBUG, "now receive status on call with server address:%s", + test_server->address().c_str()); + FinishCall(test_call.get()); + GPR_ASSERT(test_call->status.has_value()); + GPR_ASSERT(test_call->status.value() == GRPC_STATUS_PERMISSION_DENIED); + { + grpc_core::ExecCtx exec_ctx; + fake_resolver_response_generator.reset(); + } + })); + } + for (size_t i = 1; i <= kNumMessagePingPongsPerCall; i++) { + { + grpc_core::MutexLock lock(&ping_pong_round_mu); + while (ping_pongs_done < ping_pong_round * kNumCalls) { + ping_pong_round_cv.Wait(&ping_pong_round_mu); + } + ping_pong_round++; + ping_pong_round_cv.Broadcast(); + gpr_log(GPR_DEBUG, "initiate ping pong round: %ld", ping_pong_round); + } + } + for (auto& thread : threads) { + thread.join(); + } + gpr_log(GPR_DEBUG, "All RPCs completed!"); +} + +} // namespace + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(argc, argv); + grpc_init(); + auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index e5db6477cf4..6227235d23b 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -5597,6 +5597,28 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "stranded_event_test", + "platforms": [ + "linux", + "mac", + "posix" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,