diff --git a/CMakeLists.txt b/CMakeLists.txt index d0973cffe4e..88476b9185c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -708,6 +708,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx connectivity_state_test) add_dependencies(buildtests_cxx context_list_test) add_dependencies(buildtests_cxx delegating_channel_test) + add_dependencies(buildtests_cxx destroy_grpclb_channel_with_active_connect_stress_test) add_dependencies(buildtests_cxx duplicate_header_bad_client_test) add_dependencies(buildtests_cxx end2end_test) add_dependencies(buildtests_cxx error_details_test) @@ -10157,6 +10158,46 @@ target_link_libraries(delegating_channel_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(destroy_grpclb_channel_with_active_connect_stress_test + test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(destroy_grpclb_channel_with_active_connect_stress_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(destroy_grpclb_channel_with_active_connect_stress_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc++_test_util + grpc_test_util + grpc++ + grpc + gpr + address_sorting + upb + ${_gRPC_GFLAGS_LIBRARIES} +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index 2b6f2efe939..b04b2b91f64 100644 --- a/Makefile +++ b/Makefile @@ -1204,6 +1204,7 @@ connection_prefix_bad_client_test: $(BINDIR)/$(CONFIG)/connection_prefix_bad_cli connectivity_state_test: $(BINDIR)/$(CONFIG)/connectivity_state_test context_list_test: $(BINDIR)/$(CONFIG)/context_list_test delegating_channel_test: $(BINDIR)/$(CONFIG)/delegating_channel_test +destroy_grpclb_channel_with_active_connect_stress_test: $(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test duplicate_header_bad_client_test: $(BINDIR)/$(CONFIG)/duplicate_header_bad_client_test end2end_test: $(BINDIR)/$(CONFIG)/end2end_test error_details_test: $(BINDIR)/$(CONFIG)/error_details_test @@ -1580,6 +1581,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/connectivity_state_test \ $(BINDIR)/$(CONFIG)/context_list_test \ $(BINDIR)/$(CONFIG)/delegating_channel_test \ + $(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test \ $(BINDIR)/$(CONFIG)/duplicate_header_bad_client_test \ $(BINDIR)/$(CONFIG)/end2end_test \ $(BINDIR)/$(CONFIG)/error_details_test \ @@ -1735,6 +1737,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/connectivity_state_test \ $(BINDIR)/$(CONFIG)/context_list_test \ $(BINDIR)/$(CONFIG)/delegating_channel_test \ + $(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test \ $(BINDIR)/$(CONFIG)/duplicate_header_bad_client_test \ $(BINDIR)/$(CONFIG)/end2end_test \ $(BINDIR)/$(CONFIG)/error_details_test \ @@ -2225,6 +2228,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/context_list_test || ( echo test context_list_test failed ; exit 1 ) $(E) "[RUN] Testing delegating_channel_test" $(Q) $(BINDIR)/$(CONFIG)/delegating_channel_test || ( echo test delegating_channel_test failed ; exit 1 ) + $(E) "[RUN] Testing destroy_grpclb_channel_with_active_connect_stress_test" + $(Q) $(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test || ( echo test destroy_grpclb_channel_with_active_connect_stress_test failed ; exit 1 ) $(E) "[RUN] Testing duplicate_header_bad_client_test" $(Q) $(BINDIR)/$(CONFIG)/duplicate_header_bad_client_test || ( echo test duplicate_header_bad_client_test failed ; exit 1 ) $(E) "[RUN] Testing end2end_test" @@ -13816,6 +13821,49 @@ $(OBJDIR)/$(CONFIG)/test/cpp/end2end/delegating_channel_test.o: $(GENDIR)/src/pr $(OBJDIR)/$(CONFIG)/test/cpp/end2end/test_service_impl.o: $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/simple_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/simple_messages.grpc.pb.cc +DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_SRC = \ + test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc \ + +DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+. + +$(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test: $(PROTOBUF_DEP) $(DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +deps_destroy_grpclb_channel_with_active_connect_stress_test: $(DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_OBJS:.o=.dep) +endif +endif + + DUPLICATE_HEADER_BAD_CLIENT_TEST_SRC = \ test/core/bad_client/bad_client.cc \ test/core/bad_client/tests/duplicate_header.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index b0d6e8a2ef6..b6538f041d2 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5609,6 +5609,21 @@ targets: - gpr - address_sorting - upb +- name: destroy_grpclb_channel_with_active_connect_stress_test + gtest: true + build: test + language: c++ + headers: [] + src: + - test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc + deps: + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr + - address_sorting + - upb - name: duplicate_header_bad_client_test gtest: true build: test diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 0e2d6cc7fb5..64bc0039923 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -306,6 +306,37 @@ class GrpcLb : public LoadBalancingPolicy { LoadBalancingPolicy* child_ = nullptr; }; + class StateWatcher : public AsyncConnectivityStateWatcherInterface { + public: + explicit StateWatcher(RefCountedPtr parent) + : AsyncConnectivityStateWatcherInterface(parent->combiner()), + parent_(std::move(parent)) {} + + ~StateWatcher() { parent_.reset(DEBUG_LOCATION, "StateWatcher"); } + + private: + void OnConnectivityStateChange(grpc_connectivity_state new_state) override { + if (parent_->fallback_at_startup_checks_pending_ && + new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // In TRANSIENT_FAILURE. Cancel the fallback timer and go into + // fallback mode immediately. + gpr_log(GPR_INFO, + "[grpclb %p] balancer channel in state TRANSIENT_FAILURE; " + "entering fallback mode", + parent_.get()); + parent_->fallback_at_startup_checks_pending_ = false; + grpc_timer_cancel(&parent_->lb_fallback_timer_); + parent_->fallback_mode_ = true; + parent_->CreateOrUpdateChildPolicyLocked(); + // Cancel the watch, since we don't care about the channel state once we + // go into fallback mode. + parent_->CancelBalancerChannelConnectivityWatchLocked(); + } + } + + RefCountedPtr parent_; + }; + ~GrpcLb(); void ShutdownLocked() override; @@ -313,10 +344,6 @@ class GrpcLb : public LoadBalancingPolicy { // Helper functions used in UpdateLocked(). void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses, const grpc_channel_args& args); - static void OnBalancerChannelConnectivityChanged(void* arg, - grpc_error* error); - static void OnBalancerChannelConnectivityChangedLocked(void* arg, - grpc_error* error); void CancelBalancerChannelConnectivityWatchLocked(); // Methods for dealing with fallback state. @@ -348,6 +375,7 @@ class GrpcLb : public LoadBalancingPolicy { // The channel for communicating with the LB server. grpc_channel* lb_channel_ = nullptr; + StateWatcher* watcher_ = nullptr; // Response generator to inject address updates into lb_channel_. RefCountedPtr response_generator_; @@ -380,8 +408,6 @@ class GrpcLb : public LoadBalancingPolicy { bool fallback_at_startup_checks_pending_ = false; grpc_timer lb_fallback_timer_; grpc_closure lb_on_fallback_; - grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE; - grpc_closure lb_channel_on_connectivity_changed_; // The child policy to use for the backends. OrphanablePtr child_policy_; @@ -1405,6 +1431,7 @@ void GrpcLb::ShutdownLocked() { grpc_timer_cancel(&lb_call_retry_timer_); } if (fallback_at_startup_checks_pending_) { + fallback_at_startup_checks_pending_ = false; grpc_timer_cancel(&lb_fallback_timer_); CancelBalancerChannelConnectivityWatchLocked(); } @@ -1472,15 +1499,10 @@ void GrpcLb::UpdateLocked(UpdateArgs args) { grpc_channel_get_channel_stack(lb_channel_)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); // Ref held by callback. - Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release(); - GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, - &GrpcLb::OnBalancerChannelConnectivityChanged, this, - grpc_schedule_on_exec_ctx); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set(interested_parties()), - &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_, - nullptr); + watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher")); + grpc_client_channel_start_connectivity_watch( + client_channel_elem, GRPC_CHANNEL_IDLE, + OrphanablePtr(watcher_)); // Start balancer call. StartBalancerCallLocked(); } @@ -1539,60 +1561,11 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked( response_generator_->SetResponse(std::move(result)); } -void GrpcLb::OnBalancerChannelConnectivityChanged(void* arg, - grpc_error* error) { - GrpcLb* self = static_cast(arg); - self->combiner()->Run( - GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_, - &GrpcLb::OnBalancerChannelConnectivityChangedLocked, - self, nullptr), - GRPC_ERROR_REF(error)); -} - -void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, - grpc_error* /*error*/) { - GrpcLb* self = static_cast(arg); - if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) { - if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) { - // Not in TRANSIENT_FAILURE. Renew connectivity watch. - grpc_channel_element* client_channel_elem = - grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(self->lb_channel_)); - GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_, - &GrpcLb::OnBalancerChannelConnectivityChanged, self, - grpc_schedule_on_exec_ctx); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set( - self->interested_parties()), - &self->lb_channel_connectivity_, - &self->lb_channel_on_connectivity_changed_, nullptr); - return; // Early out so we don't drop the ref below. - } - // In TRANSIENT_FAILURE. Cancel the fallback timer and go into - // fallback mode immediately. - gpr_log(GPR_INFO, - "[grpclb %p] balancer channel in state TRANSIENT_FAILURE; " - "entering fallback mode", - self); - self->fallback_at_startup_checks_pending_ = false; - grpc_timer_cancel(&self->lb_fallback_timer_); - self->fallback_mode_ = true; - self->CreateOrUpdateChildPolicyLocked(); - } - // Done watching connectivity state, so drop ref. - self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity"); -} - void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() { grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(lb_channel_)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set(interested_parties()), - nullptr, &lb_channel_on_connectivity_changed_, nullptr); + grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_); } // diff --git a/test/cpp/client/BUILD b/test/cpp/client/BUILD index 5a80c112308..b895d99724f 100644 --- a/test/cpp/client/BUILD +++ b/test/cpp/client/BUILD @@ -55,3 +55,17 @@ grpc_cc_test( "//test/cpp/util:test_util", ], ) + +grpc_cc_test( + name = "destroy_grpclb_channel_with_active_connect_stress_test", + srcs = ["destroy_grpclb_channel_with_active_connect_stress_test.cc"], + external_deps = ["gtest"], + deps = [ + "//:gpr", + "//:grpc", + "//:grpc++", + "//:grpc_resolver_fake", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) diff --git a/test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc b/test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc new file mode 100644 index 00000000000..7ba76f0da75 --- /dev/null +++ b/test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc @@ -0,0 +1,129 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/sockaddr.h" + +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +namespace { + +void TryConnectAndDestroy() { + auto response_generator = + grpc_core::MakeRefCounted(); + // Return a grpclb address with an IP address on the IPv6 discard prefix + // (https://tools.ietf.org/html/rfc6666). This is important because + // the behavior we want in this test is for a TCP connect attempt to "hang", + // i.e. we want to send SYN, and then *not* receive SYN-ACK or RST. + // The precise behavior is dependant on the test runtime environment though, + // since connect() attempts on this address may unfortunately result in + // "network unreachable" errors in some test runtime environments. + char* uri_str; + gpr_asprintf(&uri_str, "ipv6:[0100::1234]:443"); + grpc_uri* lb_uri = grpc_uri_parse(uri_str, true); + gpr_free(uri_str); + GPR_ASSERT(lb_uri != nullptr); + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(lb_uri, &address)); + std::vector address_args_to_add = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_ADDRESS_IS_BALANCER), 1), + }; + grpc_core::ServerAddressList addresses; + grpc_channel_args* address_args = grpc_channel_args_copy_and_add( + nullptr, address_args_to_add.data(), address_args_to_add.size()); + addresses.emplace_back(address.addr, address.len, address_args); + grpc_core::Resolver::Result lb_address_result; + lb_address_result.addresses = addresses; + grpc_uri_destroy(lb_uri); + response_generator->SetResponse(lb_address_result); + grpc::ChannelArguments args; + args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, + response_generator.get()); + // Explicitly set the connect deadline to the same amount of + // time as the WaitForConnected time. The goal is to get the + // connect timeout code to run at about the same time as when + // the channel gets destroyed, to try to reproduce a race. + args.SetInt("grpc.testing.fixed_reconnect_backoff_ms", + grpc_test_slowdown_factor() * 100); + std::ostringstream uri; + uri << "fake:///servername_not_used"; + auto channel = ::grpc::CreateCustomChannel( + uri.str(), grpc::InsecureChannelCredentials(), args); + // Start connecting, and give some time for the TCP connection attempt to the + // unreachable balancer to begin. The connection should never become ready + // because the LB we're trying to connect to is unreachable. + channel->GetState(true /* try_to_connect */); + GPR_ASSERT( + !channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100))); + GPR_ASSERT("grpclb" == channel->GetLoadBalancingPolicyName()); + channel.reset(); +}; + +TEST(DestroyGrpclbChannelWithActiveConnectStressTest, + LoopTryConnectAndDestroy) { + grpc_init(); + std::vector> threads; + // 100 is picked for number of threads just + // because it's enough to reproduce a certain crash almost 100% + // at this time of writing. + const int kNumThreads = 100; + threads.reserve(kNumThreads); + for (int i = 0; i < kNumThreads; i++) { + threads.emplace_back(new std::thread(TryConnectAndDestroy)); + } + for (int i = 0; i < threads.size(); i++) { + threads[i]->join(); + } + grpc_shutdown(); +} + +} // namespace + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + auto result = RUN_ALL_TESTS(); + return result; +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 16ecadb5680..71e8ad42f12 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4405,6 +4405,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "destroy_grpclb_channel_with_active_connect_stress_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,