Merge pull request #22133 from apolcyn/use_internal_watch_state_with_grpclb

Change grpclb policy to use internal connectivity state watch API
pull/21606/head
apolcyn 5 years ago committed by GitHub
commit 5b9b5ebe26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      CMakeLists.txt
  2. 48
      Makefile
  3. 15
      build_autogenerated.yaml
  4. 103
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 14
      test/cpp/client/BUILD
  6. 129
      test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc
  7. 24
      tools/run_tests/generated/tests.json

@ -711,6 +711,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx connectivity_state_test)
add_dependencies(buildtests_cxx context_list_test)
add_dependencies(buildtests_cxx delegating_channel_test)
add_dependencies(buildtests_cxx destroy_grpclb_channel_with_active_connect_stress_test)
add_dependencies(buildtests_cxx duplicate_header_bad_client_test)
add_dependencies(buildtests_cxx end2end_test)
add_dependencies(buildtests_cxx error_details_test)
@ -10258,6 +10259,46 @@ target_link_libraries(delegating_channel_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(destroy_grpclb_channel_with_active_connect_stress_test
test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(destroy_grpclb_channel_with_active_connect_stress_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(destroy_grpclb_channel_with_active_connect_stress_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
grpc_test_util
grpc++
grpc
gpr
address_sorting
upb
${_gRPC_GFLAGS_LIBRARIES}
)
endif()
if(gRPC_BUILD_TESTS)

@ -1204,6 +1204,7 @@ connection_prefix_bad_client_test: $(BINDIR)/$(CONFIG)/connection_prefix_bad_cli
connectivity_state_test: $(BINDIR)/$(CONFIG)/connectivity_state_test
context_list_test: $(BINDIR)/$(CONFIG)/context_list_test
delegating_channel_test: $(BINDIR)/$(CONFIG)/delegating_channel_test
destroy_grpclb_channel_with_active_connect_stress_test: $(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test
duplicate_header_bad_client_test: $(BINDIR)/$(CONFIG)/duplicate_header_bad_client_test
end2end_test: $(BINDIR)/$(CONFIG)/end2end_test
error_details_test: $(BINDIR)/$(CONFIG)/error_details_test
@ -1580,6 +1581,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/connectivity_state_test \
$(BINDIR)/$(CONFIG)/context_list_test \
$(BINDIR)/$(CONFIG)/delegating_channel_test \
$(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test \
$(BINDIR)/$(CONFIG)/duplicate_header_bad_client_test \
$(BINDIR)/$(CONFIG)/end2end_test \
$(BINDIR)/$(CONFIG)/error_details_test \
@ -1735,6 +1737,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/connectivity_state_test \
$(BINDIR)/$(CONFIG)/context_list_test \
$(BINDIR)/$(CONFIG)/delegating_channel_test \
$(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test \
$(BINDIR)/$(CONFIG)/duplicate_header_bad_client_test \
$(BINDIR)/$(CONFIG)/end2end_test \
$(BINDIR)/$(CONFIG)/error_details_test \
@ -2225,6 +2228,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/context_list_test || ( echo test context_list_test failed ; exit 1 )
$(E) "[RUN] Testing delegating_channel_test"
$(Q) $(BINDIR)/$(CONFIG)/delegating_channel_test || ( echo test delegating_channel_test failed ; exit 1 )
$(E) "[RUN] Testing destroy_grpclb_channel_with_active_connect_stress_test"
$(Q) $(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test || ( echo test destroy_grpclb_channel_with_active_connect_stress_test failed ; exit 1 )
$(E) "[RUN] Testing duplicate_header_bad_client_test"
$(Q) $(BINDIR)/$(CONFIG)/duplicate_header_bad_client_test || ( echo test duplicate_header_bad_client_test failed ; exit 1 )
$(E) "[RUN] Testing end2end_test"
@ -13918,6 +13923,49 @@ $(OBJDIR)/$(CONFIG)/test/cpp/end2end/delegating_channel_test.o: $(GENDIR)/src/pr
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/test_service_impl.o: $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/simple_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/simple_messages.grpc.pb.cc
DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_SRC = \
test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc \
DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test: $(PROTOBUF_DEP) $(DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/destroy_grpclb_channel_with_active_connect_stress_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a
deps_destroy_grpclb_channel_with_active_connect_stress_test: $(DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(DESTROY_GRPCLB_CHANNEL_WITH_ACTIVE_CONNECT_STRESS_TEST_OBJS:.o=.dep)
endif
endif
DUPLICATE_HEADER_BAD_CLIENT_TEST_SRC = \
test/core/bad_client/bad_client.cc \
test/core/bad_client/tests/duplicate_header.cc \

@ -5693,6 +5693,21 @@ targets:
- gpr
- address_sorting
- upb
- name: destroy_grpclb_channel_with_active_connect_stress_test
gtest: true
build: test
language: c++
headers: []
src:
- test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc
deps:
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr
- address_sorting
- upb
- name: duplicate_header_bad_client_test
gtest: true
build: test

@ -301,6 +301,37 @@ class GrpcLb : public LoadBalancingPolicy {
RefCountedPtr<GrpcLb> parent_;
};
class StateWatcher : public AsyncConnectivityStateWatcherInterface {
public:
explicit StateWatcher(RefCountedPtr<GrpcLb> parent)
: AsyncConnectivityStateWatcherInterface(parent->combiner()),
parent_(std::move(parent)) {}
~StateWatcher() { parent_.reset(DEBUG_LOCATION, "StateWatcher"); }
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
if (parent_->fallback_at_startup_checks_pending_ &&
new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into
// fallback mode immediately.
gpr_log(GPR_INFO,
"[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
"entering fallback mode",
parent_.get());
parent_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&parent_->lb_fallback_timer_);
parent_->fallback_mode_ = true;
parent_->CreateOrUpdateChildPolicyLocked();
// Cancel the watch, since we don't care about the channel state once we
// go into fallback mode.
parent_->CancelBalancerChannelConnectivityWatchLocked();
}
}
RefCountedPtr<GrpcLb> parent_;
};
~GrpcLb();
void ShutdownLocked() override;
@ -308,10 +339,6 @@ class GrpcLb : public LoadBalancingPolicy {
// Helper functions used in UpdateLocked().
void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses,
const grpc_channel_args& args);
static void OnBalancerChannelConnectivityChanged(void* arg,
grpc_error* error);
static void OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* error);
void CancelBalancerChannelConnectivityWatchLocked();
// Methods for dealing with fallback state.
@ -343,6 +370,7 @@ class GrpcLb : public LoadBalancingPolicy {
// The channel for communicating with the LB server.
grpc_channel* lb_channel_ = nullptr;
StateWatcher* watcher_ = nullptr;
// Response generator to inject address updates into lb_channel_.
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
@ -375,8 +403,6 @@ class GrpcLb : public LoadBalancingPolicy {
bool fallback_at_startup_checks_pending_ = false;
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
grpc_closure lb_channel_on_connectivity_changed_;
// The child policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
@ -1353,6 +1379,7 @@ void GrpcLb::ShutdownLocked() {
grpc_timer_cancel(&lb_call_retry_timer_);
}
if (fallback_at_startup_checks_pending_) {
fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&lb_fallback_timer_);
CancelBalancerChannelConnectivityWatchLocked();
}
@ -1412,15 +1439,10 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
grpc_channel_get_channel_stack(lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
// Ref held by callback.
Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChanged, this,
grpc_schedule_on_exec_ctx);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(interested_parties()),
&lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
nullptr);
watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
grpc_client_channel_start_connectivity_watch(
client_channel_elem, GRPC_CHANNEL_IDLE,
OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
// Start balancer call.
StartBalancerCallLocked();
}
@ -1479,60 +1501,11 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked(
response_generator_->SetResponse(std::move(result));
}
void GrpcLb::OnBalancerChannelConnectivityChanged(void* arg,
grpc_error* error) {
GrpcLb* self = static_cast<GrpcLb*>(arg);
self->combiner()->Run(
GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked,
self, nullptr),
GRPC_ERROR_REF(error));
}
void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* /*error*/) {
GrpcLb* self = static_cast<GrpcLb*>(arg);
if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) {
if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Not in TRANSIENT_FAILURE. Renew connectivity watch.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(self->lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChanged, self,
grpc_schedule_on_exec_ctx);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
self->interested_parties()),
&self->lb_channel_connectivity_,
&self->lb_channel_on_connectivity_changed_, nullptr);
return; // Early out so we don't drop the ref below.
}
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into
// fallback mode immediately.
gpr_log(GPR_INFO,
"[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
"entering fallback mode",
self);
self->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&self->lb_fallback_timer_);
self->fallback_mode_ = true;
self->CreateOrUpdateChildPolicyLocked();
}
// Done watching connectivity state, so drop ref.
self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
}
void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(interested_parties()),
nullptr, &lb_channel_on_connectivity_changed_, nullptr);
grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
}
//

@ -55,3 +55,17 @@ grpc_cc_test(
"//test/cpp/util:test_util",
],
)
grpc_cc_test(
name = "destroy_grpclb_channel_with_active_connect_stress_test",
srcs = ["destroy_grpclb_channel_with_active_connect_stress_test.cc"],
external_deps = ["gtest"],
deps = [
"//:gpr",
"//:grpc",
"//:grpc++",
"//:grpc_resolver_fake",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)

@ -0,0 +1,129 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <atomic>
#include <memory>
#include <mutex>
#include <random>
#include <sstream>
#include <thread>
#include <gmock/gmock.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
namespace {
void TryConnectAndDestroy() {
auto response_generator =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
// Return a grpclb address with an IP address on the IPv6 discard prefix
// (https://tools.ietf.org/html/rfc6666). This is important because
// the behavior we want in this test is for a TCP connect attempt to "hang",
// i.e. we want to send SYN, and then *not* receive SYN-ACK or RST.
// The precise behavior is dependant on the test runtime environment though,
// since connect() attempts on this address may unfortunately result in
// "network unreachable" errors in some test runtime environments.
char* uri_str;
gpr_asprintf(&uri_str, "ipv6:[0100::1234]:443");
grpc_uri* lb_uri = grpc_uri_parse(uri_str, true);
gpr_free(uri_str);
GPR_ASSERT(lb_uri != nullptr);
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
std::vector<grpc_arg> address_args_to_add = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_BALANCER), 1),
};
grpc_core::ServerAddressList addresses;
grpc_channel_args* address_args = grpc_channel_args_copy_and_add(
nullptr, address_args_to_add.data(), address_args_to_add.size());
addresses.emplace_back(address.addr, address.len, address_args);
grpc_core::Resolver::Result lb_address_result;
lb_address_result.addresses = addresses;
grpc_uri_destroy(lb_uri);
response_generator->SetResponse(lb_address_result);
grpc::ChannelArguments args;
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator.get());
// Explicitly set the connect deadline to the same amount of
// time as the WaitForConnected time. The goal is to get the
// connect timeout code to run at about the same time as when
// the channel gets destroyed, to try to reproduce a race.
args.SetInt("grpc.testing.fixed_reconnect_backoff_ms",
grpc_test_slowdown_factor() * 100);
std::ostringstream uri;
uri << "fake:///servername_not_used";
auto channel = ::grpc::CreateCustomChannel(
uri.str(), grpc::InsecureChannelCredentials(), args);
// Start connecting, and give some time for the TCP connection attempt to the
// unreachable balancer to begin. The connection should never become ready
// because the LB we're trying to connect to is unreachable.
channel->GetState(true /* try_to_connect */);
GPR_ASSERT(
!channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(100)));
GPR_ASSERT("grpclb" == channel->GetLoadBalancingPolicyName());
channel.reset();
};
TEST(DestroyGrpclbChannelWithActiveConnectStressTest,
LoopTryConnectAndDestroy) {
grpc_init();
std::vector<std::unique_ptr<std::thread>> threads;
// 100 is picked for number of threads just
// because it's enough to reproduce a certain crash almost 100%
// at this time of writing.
const int kNumThreads = 100;
threads.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; i++) {
threads.emplace_back(new std::thread(TryConnectAndDestroy));
}
for (int i = 0; i < threads.size(); i++) {
threads[i]->join();
}
grpc_shutdown();
}
} // namespace
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
::testing::InitGoogleTest(&argc, argv);
auto result = RUN_ALL_TESTS();
return result;
}

@ -4405,6 +4405,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "destroy_grpclb_channel_with_active_connect_stress_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save