From e889fda4828872915229af82c286ae2be613f731 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Tue, 5 Mar 2019 13:29:55 -0800 Subject: [PATCH] Use real resolver in xds lb channel --- CMakeLists.txt | 48 + Makefile | 52 + build.yaml | 13 + include/grpc/impl/codegen/grpc_types.h | 4 + .../ext/filters/client_channel/lb_policy.h | 4 +- .../client_channel/lb_policy/xds/xds.cc | 716 +++++----- .../lb_policy/xds/xds_channel_secure.cc | 43 - .../resolver/fake/fake_resolver.cc | 34 +- .../resolver/fake/fake_resolver.h | 5 +- .../fake/fake_security_connector.cc | 9 +- test/cpp/end2end/BUILD | 22 + test/cpp/end2end/xds_end2end_test.cc | 1214 +++++++++++++++++ .../generated/sources_and_headers.json | 22 + tools/run_tests/generated/tests.json | 24 + 14 files changed, 1791 insertions(+), 419 deletions(-) create mode 100644 test/cpp/end2end/xds_end2end_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 939e83c481f..bccead24f28 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -720,6 +720,7 @@ add_dependencies(buildtests_cxx transport_security_common_api_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx writes_per_rpc_test) endif() +add_dependencies(buildtests_cxx xds_end2end_test) add_dependencies(buildtests_cxx resolver_component_test_unsecure) add_dependencies(buildtests_cxx resolver_component_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -16232,6 +16233,53 @@ endif() endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(xds_end2end_test + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.grpc.pb.h + test/cpp/end2end/xds_end2end_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +protobuf_generate_grpc_cpp( + src/proto/grpc/lb/v1/load_balancer.proto +) + +target_include_directories(xds_end2end_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR} + PRIVATE third_party/googletest/googletest/include + PRIVATE third_party/googletest/googletest + PRIVATE third_party/googletest/googlemock/include + PRIVATE third_party/googletest/googlemock + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(xds_end2end_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc++_test_util + grpc_test_util + grpc++ + grpc + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(public_headers_must_be_c89 test/core/surface/public_headers_must_be_c89.c ) diff --git a/Makefile b/Makefile index 3c890797431..94944265d61 100644 --- a/Makefile +++ b/Makefile @@ -1276,6 +1276,7 @@ time_change_test: $(BINDIR)/$(CONFIG)/time_change_test transport_pid_controller_test: $(BINDIR)/$(CONFIG)/transport_pid_controller_test transport_security_common_api_test: $(BINDIR)/$(CONFIG)/transport_security_common_api_test writes_per_rpc_test: $(BINDIR)/$(CONFIG)/writes_per_rpc_test +xds_end2end_test: $(BINDIR)/$(CONFIG)/xds_end2end_test public_headers_must_be_c89: $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 gen_hpack_tables: $(BINDIR)/$(CONFIG)/gen_hpack_tables gen_legal_metadata_characters: $(BINDIR)/$(CONFIG)/gen_legal_metadata_characters @@ -1787,6 +1788,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/transport_pid_controller_test \ $(BINDIR)/$(CONFIG)/transport_security_common_api_test \ $(BINDIR)/$(CONFIG)/writes_per_rpc_test \ + $(BINDIR)/$(CONFIG)/xds_end2end_test \ $(BINDIR)/$(CONFIG)/boringssl_crypto_test_data \ $(BINDIR)/$(CONFIG)/boringssl_asn1_test \ $(BINDIR)/$(CONFIG)/boringssl_base64_test \ @@ -1976,6 +1978,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/transport_pid_controller_test \ $(BINDIR)/$(CONFIG)/transport_security_common_api_test \ $(BINDIR)/$(CONFIG)/writes_per_rpc_test \ + $(BINDIR)/$(CONFIG)/xds_end2end_test \ $(BINDIR)/$(CONFIG)/resolver_component_test_unsecure \ $(BINDIR)/$(CONFIG)/resolver_component_test \ $(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker_unsecure \ @@ -2500,6 +2503,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/transport_security_common_api_test || ( echo test transport_security_common_api_test failed ; exit 1 ) $(E) "[RUN] Testing writes_per_rpc_test" $(Q) $(BINDIR)/$(CONFIG)/writes_per_rpc_test || ( echo test writes_per_rpc_test failed ; exit 1 ) + $(E) "[RUN] Testing xds_end2end_test" + $(Q) $(BINDIR)/$(CONFIG)/xds_end2end_test || ( echo test xds_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing resolver_component_tests_runner_invoker_unsecure" $(Q) $(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker_unsecure || ( echo test resolver_component_tests_runner_invoker_unsecure failed ; exit 1 ) $(E) "[RUN] Testing resolver_component_tests_runner_invoker" @@ -21308,6 +21313,53 @@ endif endif +XDS_END2END_TEST_SRC = \ + $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.pb.cc $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc \ + test/cpp/end2end/xds_end2end_test.cc \ + +XDS_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(XDS_END2END_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/xds_end2end_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+. + +$(BINDIR)/$(CONFIG)/xds_end2end_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/xds_end2end_test: $(PROTOBUF_DEP) $(XDS_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(XDS_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/xds_end2end_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/src/proto/grpc/lb/v1/load_balancer.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/xds_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_xds_end2end_test: $(XDS_END2END_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(XDS_END2END_TEST_OBJS:.o=.dep) +endif +endif +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/xds_end2end_test.o: $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.pb.cc $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc + + PUBLIC_HEADERS_MUST_BE_C89_SRC = \ test/core/surface/public_headers_must_be_c89.c \ diff --git a/build.yaml b/build.yaml index c18630ecdd3..621b9a4de2f 100644 --- a/build.yaml +++ b/build.yaml @@ -5644,6 +5644,19 @@ targets: - mac - linux - posix +- name: xds_end2end_test + gtest: true + build: test + language: c++ + src: + - src/proto/grpc/lb/v1/load_balancer.proto + - test/cpp/end2end/xds_end2end_test.cc + deps: + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr - name: public_headers_must_be_c89 build: test language: c89 diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 79b182c4515..078db2b90a8 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -317,6 +317,10 @@ typedef struct { balancer before using fallback backend addresses from the resolver. If 0, fallback will never be used. Default value is 10000. */ #define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms" +/* Timeout in milliseconds to wait for the serverlist from the xDS load + balancer before using fallback backend addresses from the resolver. + If 0, fallback will never be used. Default value is 10000. */ +#define GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS "grpc.xds_fallback_timeout_ms" /** If non-zero, grpc server's cronet compression workaround will be enabled */ #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \ "grpc.workaround.cronet_compression" diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 7a876966524..75dca52a615 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -297,8 +297,8 @@ class LoadBalancingPolicy : public InternallyRefCounted { grpc_combiner* combiner() const { return combiner_; } - // Note: LB policies MUST NOT call any method on the helper from - // their constructor. + // Note: LB policies MUST NOT call any method on the helper from their + // constructor. // Note: This will return null after ShutdownLocked() has been called. ChannelControlHelper* channel_control_helper() const { return channel_control_helper_.get(); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 6c10d876af7..5153330a84e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -129,78 +129,128 @@ class XdsLb : public LoadBalancingPolicy { channelz::ChildRefsList* child_channels) override; private: - /// Contains a call to the LB server and all the data related to the call. - class BalancerCallState : public InternallyRefCounted { + /// Contains a channel to the LB server and all the data related to the + /// channel. + class BalancerChannelState + : public InternallyRefCounted { public: - explicit BalancerCallState( - RefCountedPtr parent_xdslb_policy); + /// Contains a call to the LB server and all the data related to the call. + class BalancerCallState : public InternallyRefCounted { + public: + explicit BalancerCallState(RefCountedPtr lb_chand); - // It's the caller's responsibility to ensure that Orphan() is called from - // inside the combiner. - void Orphan() override; - - void StartQuery(); - - XdsLbClientStats* client_stats() const { return client_stats_.get(); } + // It's the caller's responsibility to ensure that Orphan() is called from + // inside the combiner. + void Orphan() override; - bool seen_initial_response() const { return seen_initial_response_; } - - private: - // So Delete() can access our private dtor. - template - friend void grpc_core::Delete(T*); + void StartQuery(); - ~BalancerCallState(); + RefCountedPtr client_stats() const { + return client_stats_; + } - XdsLb* xdslb_policy() const { - return static_cast(xdslb_policy_.get()); - } + bool seen_initial_response() const { return seen_initial_response_; } - void ScheduleNextClientLoadReportLocked(); - void SendClientLoadReportLocked(); + private: + // So Delete() can access our private dtor. + template + friend void grpc_core::Delete(T*); - static bool LoadReportCountersAreZero(xds_grpclb_request* request); + ~BalancerCallState(); - static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error); - static void OnInitialRequestSentLocked(void* arg, grpc_error* error); - static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error); - static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error); + XdsLb* xdslb_policy() const { return lb_chand_->xdslb_policy_.get(); } - // The owning LB policy. - RefCountedPtr xdslb_policy_; + bool IsCurrentCallOnChannel() const { + return this == lb_chand_->lb_calld_.get(); + } - // The streaming call to the LB server. Always non-NULL. - grpc_call* lb_call_ = nullptr; + void ScheduleNextClientLoadReportLocked(); + void SendClientLoadReportLocked(); + + static bool LoadReportCountersAreZero(xds_grpclb_request* request); + + static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error); + static void OnInitialRequestSentLocked(void* arg, grpc_error* error); + static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error); + static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error); + + // The owning LB channel. + RefCountedPtr lb_chand_; + + // The streaming call to the LB server. Always non-NULL. + grpc_call* lb_call_ = nullptr; + + // recv_initial_metadata + grpc_metadata_array lb_initial_metadata_recv_; + + // send_message + grpc_byte_buffer* send_message_payload_ = nullptr; + grpc_closure lb_on_initial_request_sent_; + + // recv_message + grpc_byte_buffer* recv_message_payload_ = nullptr; + grpc_closure lb_on_balancer_message_received_; + bool seen_initial_response_ = false; + + // recv_trailing_metadata + grpc_closure lb_on_balancer_status_received_; + grpc_metadata_array lb_trailing_metadata_recv_; + grpc_status_code lb_call_status_; + grpc_slice lb_call_status_details_; + + // The stats for client-side load reporting associated with this LB call. + // Created after the first serverlist is received. + RefCountedPtr client_stats_; + grpc_millis client_stats_report_interval_ = 0; + grpc_timer client_load_report_timer_; + bool client_load_report_timer_callback_pending_ = false; + bool last_client_load_report_counters_were_zero_ = false; + bool client_load_report_is_due_ = false; + // The closure used for either the load report timer or the callback for + // completion of sending the load report. + grpc_closure client_load_report_closure_; + }; + + BalancerChannelState(const char* balancer_name, + const grpc_channel_args& args, + RefCountedPtr parent_xdslb_policy); + ~BalancerChannelState(); - // recv_initial_metadata - grpc_metadata_array lb_initial_metadata_recv_; + void Orphan() override; - // send_message - grpc_byte_buffer* send_message_payload_ = nullptr; - grpc_closure lb_on_initial_request_sent_; + grpc_channel* channel() const { return channel_; } + BalancerCallState* lb_calld() const { return lb_calld_.get(); } - // recv_message - grpc_byte_buffer* recv_message_payload_ = nullptr; - grpc_closure lb_on_balancer_message_received_; - bool seen_initial_response_ = false; + bool IsCurrentChannel() const { + return this == xdslb_policy_->lb_chand_.get(); + } + bool IsPendingChannel() const { + return this == xdslb_policy_->pending_lb_chand_.get(); + } + bool HasActiveCall() const { return lb_calld_ != nullptr; } - // recv_trailing_metadata - grpc_closure lb_on_balancer_status_received_; - grpc_metadata_array lb_trailing_metadata_recv_; - grpc_status_code lb_call_status_; - grpc_slice lb_call_status_details_; + void StartCallRetryTimerLocked(); + static void OnCallRetryTimerLocked(void* arg, grpc_error* error); + void StartCallLocked(); - // The stats for client-side load reporting associated with this LB call. - // Created after the first serverlist is received. - RefCountedPtr client_stats_; - grpc_millis client_stats_report_interval_ = 0; - grpc_timer client_load_report_timer_; - bool client_load_report_timer_callback_pending_ = false; - bool last_client_load_report_counters_were_zero_ = false; - bool client_load_report_is_due_ = false; - // The closure used for either the load report timer or the callback for - // completion of sending the load report. - grpc_closure client_load_report_closure_; + private: + // The owning LB policy. + RefCountedPtr xdslb_policy_; + + // The channel and its status. + grpc_channel* channel_; + bool shutting_down_ = false; + + // The data associated with the current LB call. It holds a ref to this LB + // channel. It's instantiated every time we query for backends. It's reset + // whenever the current LB call is no longer needed (e.g., the LB policy is + // shutting down, or the LB call has ended). A non-NULL lb_calld_ always + // contains a non-NULL lb_call_. + OrphanablePtr lb_calld_; + BackOff lb_call_backoff_; + grpc_timer lb_call_retry_timer_; + grpc_closure lb_on_call_retry_; + bool retry_timer_callback_pending_ = false; }; class Picker : public SubchannelPicker { @@ -245,13 +295,13 @@ class XdsLb : public LoadBalancingPolicy { // found. Does nothing upon failure. void ParseLbConfig(Config* xds_config); - // Methods for dealing with the balancer channel and call. - void StartBalancerCallLocked(); + BalancerChannelState* LatestLbChannel() const { + return pending_lb_chand_ != nullptr ? pending_lb_chand_.get() + : lb_chand_.get(); + } + + // Callback to enter fallback mode. static void OnFallbackTimerLocked(void* arg, grpc_error* error); - void StartBalancerCallRetryTimerLocked(); - static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); - static void OnBalancerChannelConnectivityChangedLocked(void* arg, - grpc_error* error); // Methods for dealing with the child policy. void CreateOrUpdateChildPolicyLocked(); @@ -271,30 +321,15 @@ class XdsLb : public LoadBalancingPolicy { bool shutting_down_ = false; // The channel for communicating with the LB server. - grpc_channel* lb_channel_ = nullptr; + OrphanablePtr lb_chand_; + OrphanablePtr pending_lb_chand_; // Mutex to protect the channel to the LB server. This is used when // processing a channelz request. - gpr_mu lb_channel_mu_; - grpc_connectivity_state lb_channel_connectivity_; - grpc_closure lb_channel_on_connectivity_changed_; - // Are we already watching the LB channel's connectivity? - bool watching_lb_channel_ = false; - // Response generator to inject address updates into lb_channel_. - RefCountedPtr response_generator_; - - // The data associated with the current LB call. It holds a ref to this LB - // policy. It's initialized every time we query for backends. It's reset to - // NULL whenever the current LB call is no longer needed (e.g., the LB policy - // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always - // contains a non-NULL lb_call_. - OrphanablePtr lb_calld_; + // TODO(juanlishen): Replace this with atomic. + gpr_mu lb_chand_mu_; + // Timeout in milliseconds for the LB call. 0 means no deadline. int lb_call_timeout_ms_ = 0; - // Balancer call retry state. - BackOff lb_call_backoff_; - bool retry_timer_callback_pending_ = false; - grpc_timer lb_call_retry_timer_; - grpc_closure lb_on_call_retry_; // The deserialized response from the balancer. May be nullptr until one // such response has arrived. @@ -360,11 +395,11 @@ void XdsLb::Helper::UpdateState(grpc_connectivity_state state, // TODO(juanlishen): When in fallback mode, pass the child picker // through without wrapping it. (Or maybe use a different helper for // the fallback policy?) - RefCountedPtr client_stats; - if (parent_->lb_calld_ != nullptr && - parent_->lb_calld_->client_stats() != nullptr) { - client_stats = parent_->lb_calld_->client_stats()->Ref(); - } + GPR_ASSERT(parent_->lb_chand_ != nullptr); + RefCountedPtr client_stats = + parent_->lb_chand_->lb_calld() == nullptr + ? nullptr + : parent_->lb_chand_->lb_calld()->client_stats(); parent_->channel_control_helper()->UpdateState( state, state_error, UniquePtr( @@ -379,12 +414,13 @@ void XdsLb::Helper::RequestReresolution() { "(%p).", parent_.get(), parent_->child_policy_.get()); } + GPR_ASSERT(parent_->lb_chand_ != nullptr); // If we are talking to a balancer, we expect to get updated addresses // from the balancer, so we can ignore the re-resolution request from - // the RR policy. Otherwise, pass the re-resolution request up to the + // the child policy. Otherwise, pass the re-resolution request up to the // channel. - if (parent_->lb_calld_ == nullptr || - !parent_->lb_calld_->seen_initial_response()) { + if (parent_->lb_chand_->lb_calld() == nullptr || + !parent_->lb_chand_->lb_calld()->seen_initial_response()) { parent_->channel_control_helper()->RequestReresolution(); } } @@ -465,14 +501,98 @@ UniquePtr ProcessServerlist( } // -// XdsLb::BalancerCallState +// XdsLb::BalancerChannelState // -XdsLb::BalancerCallState::BalancerCallState( - RefCountedPtr parent_xdslb_policy) +XdsLb::BalancerChannelState::BalancerChannelState( + const char* balancer_name, const grpc_channel_args& args, + grpc_core::RefCountedPtr parent_xdslb_policy) + : InternallyRefCounted(&grpc_lb_xds_trace), + xdslb_policy_(std::move(parent_xdslb_policy)), + lb_call_backoff_( + BackOff::Options() + .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS * + 1000) + .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_XDS_RECONNECT_JITTER) + .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { + channel_ = xdslb_policy_->channel_control_helper()->CreateChannel( + balancer_name, args); + GPR_ASSERT(channel_ != nullptr); + StartCallLocked(); +} + +XdsLb::BalancerChannelState::~BalancerChannelState() { + grpc_channel_destroy(channel_); +} + +void XdsLb::BalancerChannelState::Orphan() { + shutting_down_ = true; + lb_calld_.reset(); + if (retry_timer_callback_pending_) grpc_timer_cancel(&lb_call_retry_timer_); + Unref(DEBUG_LOCATION, "lb_channel_orphaned"); +} + +void XdsLb::BalancerChannelState::StartCallRetryTimerLocked() { + grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Failed to connect to LB server (lb_chand: %p)...", + xdslb_policy_.get(), this); + grpc_millis timeout = next_try - ExecCtx::Get()->Now(); + if (timeout > 0) { + gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active in %" PRId64 "ms.", + xdslb_policy_.get(), timeout); + } else { + gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active immediately.", + xdslb_policy_.get()); + } + } + Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer").release(); + GRPC_CLOSURE_INIT(&lb_on_call_retry_, &OnCallRetryTimerLocked, this, + grpc_combiner_scheduler(xdslb_policy_->combiner())); + grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); + retry_timer_callback_pending_ = true; +} + +void XdsLb::BalancerChannelState::OnCallRetryTimerLocked(void* arg, + grpc_error* error) { + BalancerChannelState* lb_chand = static_cast(arg); + lb_chand->retry_timer_callback_pending_ = false; + if (!lb_chand->shutting_down_ && error == GRPC_ERROR_NONE && + lb_chand->lb_calld_ == nullptr) { + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Restarting call to LB server (lb_chand: %p)", + lb_chand->xdslb_policy_.get(), lb_chand); + } + lb_chand->StartCallLocked(); + } + lb_chand->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); +} + +void XdsLb::BalancerChannelState::StartCallLocked() { + if (shutting_down_) return; + GPR_ASSERT(channel_ != nullptr); + GPR_ASSERT(lb_calld_ == nullptr); + lb_calld_ = MakeOrphanable(Ref()); + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Query for backends (lb_chand: %p, lb_calld: %p)", + xdslb_policy_.get(), this, lb_calld_.get()); + } + lb_calld_->StartQuery(); +} + +// +// XdsLb::BalancerChannelState::BalancerCallState +// + +XdsLb::BalancerChannelState::BalancerCallState::BalancerCallState( + RefCountedPtr lb_chand) : InternallyRefCounted(&grpc_lb_xds_trace), - xdslb_policy_(std::move(parent_xdslb_policy)) { - GPR_ASSERT(xdslb_policy_ != nullptr); + lb_chand_(std::move(lb_chand)) { + GPR_ASSERT(xdslb_policy() != nullptr); GPR_ASSERT(!xdslb_policy()->shutting_down_); // Init the LB call. Note that the LB call will progress every time there's // activity in xdslb_policy_->interested_parties(), which is comprised of @@ -484,8 +604,8 @@ XdsLb::BalancerCallState::BalancerCallState( ? GRPC_MILLIS_INF_FUTURE : ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_; lb_call_ = grpc_channel_create_pollset_set_call( - xdslb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, - xdslb_policy_->interested_parties(), + lb_chand_->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, + xdslb_policy()->interested_parties(), GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, nullptr, deadline, nullptr); // Init the LB call request payload. @@ -509,7 +629,7 @@ XdsLb::BalancerCallState::BalancerCallState( grpc_combiner_scheduler(xdslb_policy()->combiner())); } -XdsLb::BalancerCallState::~BalancerCallState() { +XdsLb::BalancerChannelState::BalancerCallState::~BalancerCallState() { GPR_ASSERT(lb_call_ != nullptr); grpc_call_unref(lb_call_); grpc_metadata_array_destroy(&lb_initial_metadata_recv_); @@ -519,7 +639,7 @@ XdsLb::BalancerCallState::~BalancerCallState() { grpc_slice_unref_internal(lb_call_status_details_); } -void XdsLb::BalancerCallState::Orphan() { +void XdsLb::BalancerChannelState::BalancerCallState::Orphan() { GPR_ASSERT(lb_call_ != nullptr); // If we are here because xdslb_policy wants to cancel the call, // lb_on_balancer_status_received_ will complete the cancellation and clean @@ -534,11 +654,11 @@ void XdsLb::BalancerCallState::Orphan() { // in lb_on_balancer_status_received_ instead of here. } -void XdsLb::BalancerCallState::StartQuery() { +void XdsLb::BalancerChannelState::BalancerCallState::StartQuery() { GPR_ASSERT(lb_call_ != nullptr); if (grpc_lb_xds_trace.enabled()) { gpr_log(GPR_INFO, "[xdslb %p] Starting LB call (lb_calld: %p, lb_call: %p)", - xdslb_policy_.get(), this, lb_call_); + xdslb_policy(), this, lb_call_); } // Create the ops. grpc_call_error call_error; @@ -606,7 +726,8 @@ void XdsLb::BalancerCallState::StartQuery() { GPR_ASSERT(GRPC_CALL_OK == call_error); } -void XdsLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { +void XdsLb::BalancerChannelState::BalancerCallState:: + ScheduleNextClientLoadReportLocked() { const grpc_millis next_client_load_report_time = ExecCtx::Get()->Now() + client_stats_report_interval_; GRPC_CLOSURE_INIT(&client_load_report_closure_, @@ -617,12 +738,11 @@ void XdsLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { client_load_report_timer_callback_pending_ = true; } -void XdsLb::BalancerCallState::MaybeSendClientLoadReportLocked( - void* arg, grpc_error* error) { +void XdsLb::BalancerChannelState::BalancerCallState:: + MaybeSendClientLoadReportLocked(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); - XdsLb* xdslb_policy = lb_calld->xdslb_policy(); lb_calld->client_load_report_timer_callback_pending_ = false; - if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) { + if (error != GRPC_ERROR_NONE || !lb_calld->IsCurrentCallOnChannel()) { lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); return; } @@ -636,7 +756,7 @@ void XdsLb::BalancerCallState::MaybeSendClientLoadReportLocked( } } -bool XdsLb::BalancerCallState::LoadReportCountersAreZero( +bool XdsLb::BalancerChannelState::BalancerCallState::LoadReportCountersAreZero( xds_grpclb_request* request) { XdsLbClientStats::DroppedCallCounts* drop_entries = static_cast( @@ -650,7 +770,8 @@ bool XdsLb::BalancerCallState::LoadReportCountersAreZero( } // TODO(vpowar): Use LRS to send the client Load Report. -void XdsLb::BalancerCallState::SendClientLoadReportLocked() { +void XdsLb::BalancerChannelState::BalancerCallState:: + SendClientLoadReportLocked() { // Construct message payload. GPR_ASSERT(send_message_payload_ == nullptr); xds_grpclb_request* request = @@ -671,27 +792,27 @@ void XdsLb::BalancerCallState::SendClientLoadReportLocked() { xds_grpclb_request_destroy(request); } -void XdsLb::BalancerCallState::OnInitialRequestSentLocked(void* arg, - grpc_error* error) { +void XdsLb::BalancerChannelState::BalancerCallState::OnInitialRequestSentLocked( + void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); grpc_byte_buffer_destroy(lb_calld->send_message_payload_); lb_calld->send_message_payload_ = nullptr; // If we attempted to send a client load report before the initial request was // sent (and this lb_calld is still in use), send the load report now. if (lb_calld->client_load_report_is_due_ && - lb_calld == lb_calld->xdslb_policy()->lb_calld_.get()) { + lb_calld->IsCurrentCallOnChannel()) { lb_calld->SendClientLoadReportLocked(); lb_calld->client_load_report_is_due_ = false; } lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent"); } -void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( - void* arg, grpc_error* error) { +void XdsLb::BalancerChannelState::BalancerCallState:: + OnBalancerMessageReceivedLocked(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); XdsLb* xdslb_policy = lb_calld->xdslb_policy(); // Empty payload means the LB call was cancelled. - if (lb_calld != xdslb_policy->lb_calld_.get() || + if (!lb_calld->IsCurrentCallOnChannel() || lb_calld->recv_message_payload_ == nullptr) { lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); return; @@ -709,20 +830,25 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( nullptr) { // Have NOT seen initial response, look for initial response. if (initial_response->has_client_stats_report_interval) { - lb_calld->client_stats_report_interval_ = GPR_MAX( - GPR_MS_PER_SEC, xds_grpclb_duration_to_millis( - &initial_response->client_stats_report_interval)); - if (grpc_lb_xds_trace.enabled()) { + const grpc_millis interval = xds_grpclb_duration_to_millis( + &initial_response->client_stats_report_interval); + if (interval > 0) { + lb_calld->client_stats_report_interval_ = + GPR_MAX(GPR_MS_PER_SEC, interval); + } + } + if (grpc_lb_xds_trace.enabled()) { + if (lb_calld->client_stats_report_interval_ != 0) { gpr_log(GPR_INFO, "[xdslb %p] Received initial LB response message; " "client load reporting interval = %" PRId64 " milliseconds", xdslb_policy, lb_calld->client_stats_report_interval_); + } else { + gpr_log(GPR_INFO, + "[xdslb %p] Received initial LB response message; client load " + "reporting NOT enabled", + xdslb_policy); } - } else if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, - "[xdslb %p] Received initial LB response message; client load " - "reporting NOT enabled", - xdslb_policy); } xds_grpclb_initial_response_destroy(initial_response); lb_calld->seen_initial_response_ = true; @@ -745,7 +871,23 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( } } /* update serverlist */ + // TODO(juanlishen): Don't ingore empty serverlist. if (serverlist->num_servers > 0) { + // Pending LB channel receives a serverlist; promote it. + // Note that this call can't be on a discarded pending channel, because + // such channels don't have any current call but we have checked this call + // is a current call. + if (!lb_calld->lb_chand_->IsCurrentChannel()) { + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Promoting pending LB channel %p to replace " + "current LB channel %p", + xdslb_policy, lb_calld->lb_chand_.get(), + lb_calld->xdslb_policy()->lb_chand_.get()); + } + lb_calld->xdslb_policy()->lb_chand_ = + std::move(lb_calld->xdslb_policy()->pending_lb_chand_); + } // Start sending client load report only after we start using the // serverlist returned from the current LB call. if (lb_calld->client_stats_report_interval_ > 0 && @@ -818,37 +960,53 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( } } -void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked( - void* arg, grpc_error* error) { +void XdsLb::BalancerChannelState::BalancerCallState:: + OnBalancerStatusReceivedLocked(void* arg, grpc_error* error) { BalancerCallState* lb_calld = static_cast(arg); XdsLb* xdslb_policy = lb_calld->xdslb_policy(); + BalancerChannelState* lb_chand = lb_calld->lb_chand_.get(); GPR_ASSERT(lb_calld->lb_call_ != nullptr); if (grpc_lb_xds_trace.enabled()) { char* status_details = grpc_slice_to_c_string(lb_calld->lb_call_status_details_); gpr_log(GPR_INFO, "[xdslb %p] Status from LB server received. Status = %d, details " - "= '%s', (lb_calld: %p, lb_call: %p), error '%s'", - xdslb_policy, lb_calld->lb_call_status_, status_details, lb_calld, - lb_calld->lb_call_, grpc_error_string(error)); + "= '%s', (lb_chand: %p, lb_calld: %p, lb_call: %p), error '%s'", + xdslb_policy, lb_calld->lb_call_status_, status_details, lb_chand, + lb_calld, lb_calld->lb_call_, grpc_error_string(error)); gpr_free(status_details); } - // If this lb_calld is still in use, this call ended because of a failure so - // we want to retry connecting. Otherwise, we have deliberately ended this - // call and no further action is required. - if (lb_calld == xdslb_policy->lb_calld_.get()) { - xdslb_policy->lb_calld_.reset(); + // Ignore status from a stale call. + if (lb_calld->IsCurrentCallOnChannel()) { + // Because this call is the current one on the channel, the channel can't + // have been swapped out; otherwise, the call should have been reset. + GPR_ASSERT(lb_chand->IsCurrentChannel() || lb_chand->IsPendingChannel()); GPR_ASSERT(!xdslb_policy->shutting_down_); - xdslb_policy->channel_control_helper()->RequestReresolution(); - if (lb_calld->seen_initial_response_) { - // If we lose connection to the LB server, reset the backoff and restart - // the LB call immediately. - xdslb_policy->lb_call_backoff_.Reset(); - xdslb_policy->StartBalancerCallLocked(); + if (lb_chand != xdslb_policy->LatestLbChannel()) { + // This channel must be the current one and there is a pending one. Swap + // in the pending one and we are done. + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Promoting pending LB channel %p to replace " + "current LB channel %p", + xdslb_policy, lb_calld->lb_chand_.get(), + lb_calld->xdslb_policy()->lb_chand_.get()); + } + xdslb_policy->lb_chand_ = std::move(xdslb_policy->pending_lb_chand_); } else { - // If this LB call fails establishing any connection to the LB server, - // retry later. - xdslb_policy->StartBalancerCallRetryTimerLocked(); + // This channel is the most recently created one. Try to restart the call + // and reresolve. + lb_chand->lb_calld_.reset(); + if (lb_calld->seen_initial_response_) { + // If we lost connection to the LB server, reset the backoff and restart + // the LB call immediately. + lb_chand->lb_call_backoff_.Reset(); + lb_chand->StartCallLocked(); + } else { + // If we failed to connect to the LB server, retry later. + lb_chand->StartCallRetryTimerLocked(); + } + xdslb_policy->channel_control_helper()->RequestReresolution(); } } lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended"); @@ -858,53 +1016,23 @@ void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked( // helper code for creating balancer channel // -UniquePtr ExtractBalancerAddresses( - const ServerAddressList& addresses) { - auto balancer_addresses = MakeUnique(); - for (size_t i = 0; i < addresses.size(); ++i) { - if (addresses[i].IsBalancer()) { - balancer_addresses->emplace_back(addresses[i]); - } - } - return balancer_addresses; -} - -/* Returns the channel args for the LB channel, used to create a bidirectional - * stream for the reception of load balancing updates. - * - * Inputs: - * - \a addresses: corresponding to the balancers. - * - \a response_generator: in order to propagate updates from the resolver - * above the grpclb policy. - * - \a args: other args inherited from the xds policy. */ -grpc_channel_args* BuildBalancerChannelArgs( - const ServerAddressList& addresses, - FakeResolverResponseGenerator* response_generator, - const grpc_channel_args* args) { - UniquePtr balancer_addresses = - ExtractBalancerAddresses(addresses); - // Channel args to remove. +// Returns the channel args for the LB channel, used to create a bidirectional +// stream for the reception of load balancing updates. +grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) { static const char* args_to_remove[] = { // LB policy name, since we want to use the default (pick_first) in // the LB channel. GRPC_ARG_LB_POLICY_NAME, + // The service config that contains the LB config. We don't want to + // recursively use xds in the LB channel. + GRPC_ARG_SERVICE_CONFIG, // The channel arg for the server URI, since that will be different for // the LB channel than for the parent channel. The client channel // factory will re-add this arg with the right value. GRPC_ARG_SERVER_URI, // The resolved addresses, which will be generated by the name resolver - // used in the LB channel. Note that the LB channel will use the fake - // resolver, so this won't actually generate a query to DNS (or some - // other name service). However, the addresses returned by the fake - // resolver will have is_balancer=false, whereas our own addresses have - // is_balancer=true. We need the LB channel to return addresses with - // is_balancer=false so that it does not wind up recursively using the - // xds LB policy, as per the special case logic in client_channel.c. + // used in the LB channel. GRPC_ARG_SERVER_ADDRESS_LIST, - // The fake resolver response generator, because we are replacing it - // with the one from the xds policy, used to propagate updates to - // the LB channel. - GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, // The LB channel should use the authority indicated by the target // authority table (see \a grpc_lb_policy_xds_modify_lb_channel_args), // as opposed to the authority from the parent channel. @@ -916,14 +1044,6 @@ grpc_channel_args* BuildBalancerChannelArgs( }; // Channel args to add. const grpc_arg args_to_add[] = { - // New server address list. - // Note that we pass these in both when creating the LB channel - // and via the fake resolver. The latter is what actually gets used. - CreateServerAddressListChannelArg(balancer_addresses.get()), - // The fake resolver response generator, which we use to inject - // address updates into the LB channel. - grpc_core::FakeResolverResponseGenerator::MakeChannelArg( - response_generator), // A channel arg indicating the target is a xds load balancer. grpc_channel_arg_integer_create( const_cast(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1), @@ -944,21 +1064,8 @@ grpc_channel_args* BuildBalancerChannelArgs( // ctor and dtor // -XdsLb::XdsLb(Args args) - : LoadBalancingPolicy(std::move(args)), - response_generator_(MakeRefCounted()), - lb_call_backoff_( - BackOff::Options() - .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS * - 1000) - .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER) - .set_jitter(GRPC_XDS_RECONNECT_JITTER) - .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { - // Initialization. - gpr_mu_init(&lb_channel_mu_); - GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, - &XdsLb::OnBalancerChannelConnectivityChangedLocked, this, - grpc_combiner_scheduler(args.combiner)); +XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)) { + gpr_mu_init(&lb_chand_mu_); // Record server name. const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(arg); @@ -982,7 +1089,7 @@ XdsLb::XdsLb(Args args) } XdsLb::~XdsLb() { - gpr_mu_destroy(&lb_channel_mu_); + gpr_mu_destroy(&lb_chand_mu_); gpr_free((void*)server_name_); grpc_channel_args_destroy(args_); if (serverlist_ != nullptr) { @@ -992,10 +1099,6 @@ XdsLb::~XdsLb() { void XdsLb::ShutdownLocked() { shutting_down_ = true; - lb_calld_.reset(); - if (retry_timer_callback_pending_) { - grpc_timer_cancel(&lb_call_retry_timer_); - } if (fallback_timer_callback_pending_) { grpc_timer_cancel(&lb_fallback_timer_); } @@ -1004,11 +1107,10 @@ void XdsLb::ShutdownLocked() { // destroying the channel triggers a last callback to // OnBalancerChannelConnectivityChangedLocked(), and we need to be // alive when that callback is invoked. - if (lb_channel_ != nullptr) { - gpr_mu_lock(&lb_channel_mu_); - grpc_channel_destroy(lb_channel_); - lb_channel_ = nullptr; - gpr_mu_unlock(&lb_channel_mu_); + { + MutexLock lock(&lb_chand_mu_); + lb_chand_.reset(); + pending_lb_chand_.reset(); } } @@ -1017,8 +1119,11 @@ void XdsLb::ShutdownLocked() { // void XdsLb::ResetBackoffLocked() { - if (lb_channel_ != nullptr) { - grpc_channel_reset_connect_backoff(lb_channel_); + if (lb_chand_ != nullptr) { + grpc_channel_reset_connect_backoff(lb_chand_->channel()); + } + if (pending_lb_chand_ != nullptr) { + grpc_channel_reset_connect_backoff(pending_lb_chand_->channel()); } if (child_policy_ != nullptr) { child_policy_->ResetBackoffLocked(); @@ -1027,12 +1132,19 @@ void XdsLb::ResetBackoffLocked() { void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, channelz::ChildRefsList* child_channels) { - // delegate to the child_policy_ to fill the children subchannels. + // Delegate to the child_policy_ to fill the children subchannels. child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); - MutexLock lock(&lb_channel_mu_); - if (lb_channel_ != nullptr) { + MutexLock lock(&lb_chand_mu_); + if (lb_chand_ != nullptr) { grpc_core::channelz::ChannelNode* channel_node = - grpc_channel_get_channelz_node(lb_channel_); + grpc_channel_get_channelz_node(lb_chand_->channel()); + if (channel_node != nullptr) { + child_channels->push_back(channel_node->uuid()); + } + } + if (pending_lb_chand_ != nullptr) { + grpc_core::channelz::ChannelNode* channel_node = + grpc_channel_get_channelz_node(pending_lb_chand_->channel()); if (channel_node != nullptr) { child_channels->push_back(channel_node->uuid()); } @@ -1059,22 +1171,29 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { args_ = grpc_channel_args_copy_and_add_and_remove( &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); // Construct args for balancer channel. - grpc_channel_args* lb_channel_args = - BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args); - // Create balancer channel if needed. - if (lb_channel_ == nullptr) { - char* uri_str; - gpr_asprintf(&uri_str, "fake:///%s", server_name_); - gpr_mu_lock(&lb_channel_mu_); - lb_channel_ = - channel_control_helper()->CreateChannel(uri_str, *lb_channel_args); - gpr_mu_unlock(&lb_channel_mu_); - GPR_ASSERT(lb_channel_ != nullptr); - gpr_free(uri_str); + grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(&args); + // Create an LB channel if we don't have one yet or the balancer name has + // changed from the last received one. + bool create_lb_channel = lb_chand_ == nullptr; + if (lb_chand_ != nullptr) { + UniquePtr last_balancer_name( + grpc_channel_get_target(LatestLbChannel()->channel())); + create_lb_channel = + strcmp(last_balancer_name.get(), balancer_name_.get()) != 0; + } + if (create_lb_channel) { + OrphanablePtr lb_chand = + MakeOrphanable(balancer_name_.get(), + *lb_channel_args, Ref()); + if (lb_chand_ == nullptr || !lb_chand_->HasActiveCall()) { + GPR_ASSERT(pending_lb_chand_ == nullptr); + // If we do not have a working LB channel yet, use the newly created one. + lb_chand_ = std::move(lb_chand); + } else { + // Otherwise, wait until the new LB channel to be ready to swap it in. + pending_lb_chand_ = std::move(lb_chand); + } } - // Propagate updates to the LB channel (pick_first) through the fake - // resolver. - response_generator_->SetResponse(lb_channel_args); grpc_channel_args_destroy(lb_channel_args); } @@ -1114,12 +1233,13 @@ void XdsLb::ParseLbConfig(Config* xds_config) { void XdsLb::UpdateLocked(const grpc_channel_args& args, RefCountedPtr lb_config) { - const bool is_initial_update = lb_channel_ == nullptr; + const bool is_initial_update = lb_chand_ == nullptr; ParseLbConfig(lb_config.get()); // TODO(juanlishen): Pass fallback policy config update after fallback policy // is added. if (balancer_name_ == nullptr) { gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this); + return; } ProcessChannelArgsLocked(args); // Update the existing child policy. @@ -1139,24 +1259,6 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args, fallback_timer_callback_pending_ = true; grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); } - StartBalancerCallLocked(); - } else if (!watching_lb_channel_) { - // If this is not the initial update and we're not already watching - // the LB channel's connectivity state, start a watch now. This - // ensures that we'll know when to switch to a new balancer call. - lb_channel_connectivity_ = grpc_channel_check_connectivity_state( - lb_channel_, true /* try to connect */); - grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(lb_channel_)); - GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - watching_lb_channel_ = true; - // Ref held by closure. - Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release(); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set(interested_parties()), - &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_, - nullptr); } } @@ -1164,20 +1266,6 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args, // code for balancer channel and call // -void XdsLb::StartBalancerCallLocked() { - GPR_ASSERT(lb_channel_ != nullptr); - if (shutting_down_) return; - // Init the LB call data. - GPR_ASSERT(lb_calld_ == nullptr); - lb_calld_ = MakeOrphanable(Ref()); - if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, - "[xdslb %p] Query for backends (lb_channel: %p, lb_calld: %p)", - this, lb_channel_, lb_calld_.get()); - } - lb_calld_->StartQuery(); -} - void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { XdsLb* xdslb_policy = static_cast(arg); xdslb_policy->fallback_timer_callback_pending_ = false; @@ -1194,88 +1282,6 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); } -void XdsLb::StartBalancerCallRetryTimerLocked() { - grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); - if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, "[xdslb %p] Connection to LB server lost...", this); - grpc_millis timeout = next_try - ExecCtx::Get()->Now(); - if (timeout > 0) { - gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active in %" PRId64 "ms.", - this, timeout); - } else { - gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active immediately.", this); - } - } - // TODO(roth): We currently track this ref manually. Once the - // ClosureRef API is ready, we should pass the RefCountedPtr<> along - // with the callback. - auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); - self.release(); - GRPC_CLOSURE_INIT(&lb_on_call_retry_, &XdsLb::OnBalancerCallRetryTimerLocked, - this, grpc_combiner_scheduler(combiner())); - retry_timer_callback_pending_ = true; - grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); -} - -void XdsLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { - XdsLb* xdslb_policy = static_cast(arg); - xdslb_policy->retry_timer_callback_pending_ = false; - if (!xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE && - xdslb_policy->lb_calld_ == nullptr) { - if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, "[xdslb %p] Restarting call to LB server", - xdslb_policy); - } - xdslb_policy->StartBalancerCallLocked(); - } - xdslb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); -} - -// Invoked as part of the update process. It continues watching the LB channel -// until it shuts down or becomes READY. It's invoked even if the LB channel -// stayed READY throughout the update (for example if the update is identical). -void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg, - grpc_error* error) { - XdsLb* xdslb_policy = static_cast(arg); - if (xdslb_policy->shutting_down_) goto done; - // Re-initialize the lb_call. This should also take care of updating the - // child policy. Note that the current child policy, if any, will - // stay in effect until an update from the new lb_call is received. - switch (xdslb_policy->lb_channel_connectivity_) { - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_TRANSIENT_FAILURE: { - // Keep watching the LB channel. - grpc_channel_element* client_channel_elem = - grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(xdslb_policy->lb_channel_)); - GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set( - xdslb_policy->interested_parties()), - &xdslb_policy->lb_channel_connectivity_, - &xdslb_policy->lb_channel_on_connectivity_changed_, nullptr); - break; - } - // The LB channel may be IDLE because it's shut down before the update. - // Restart the LB call to kick the LB channel into gear. - case GRPC_CHANNEL_IDLE: - case GRPC_CHANNEL_READY: - xdslb_policy->lb_calld_.reset(); - if (xdslb_policy->retry_timer_callback_pending_) { - grpc_timer_cancel(&xdslb_policy->lb_call_retry_timer_); - } - xdslb_policy->lb_call_backoff_.Reset(); - xdslb_policy->StartBalancerCallLocked(); - // Fall through. - case GRPC_CHANNEL_SHUTDOWN: - done: - xdslb_policy->watching_lb_channel_ = false; - xdslb_policy->Unref(DEBUG_LOCATION, - "watch_lb_channel_connectivity_cb_shutdown"); - } -} - // // code for interacting with the child policy // @@ -1360,18 +1366,6 @@ class XdsFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { - /* Count the number of gRPC-LB addresses. There must be at least one. */ - const ServerAddressList* addresses = - FindServerAddressListChannelArg(args.args); - if (addresses == nullptr) return nullptr; - bool found_balancer_address = false; - for (size_t i = 0; i < addresses->size(); ++i) { - if ((*addresses)[i].IsBalancer()) { - found_balancer_address = true; - break; - } - } - if (!found_balancer_address) return nullptr; return OrphanablePtr(New(std::move(args))); } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc index 55c646e6eed..7f8c232d6d0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc @@ -33,55 +33,12 @@ #include "src/core/lib/security/transport/target_authority_table.h" #include "src/core/lib/slice/slice_internal.h" -namespace grpc_core { -namespace { - -int BalancerNameCmp(const grpc_core::UniquePtr& a, - const grpc_core::UniquePtr& b) { - return strcmp(a.get(), b.get()); -} - -RefCountedPtr CreateTargetAuthorityTable( - const ServerAddressList& addresses) { - TargetAuthorityTable::Entry* target_authority_entries = - static_cast( - gpr_zalloc(sizeof(*target_authority_entries) * addresses.size())); - for (size_t i = 0; i < addresses.size(); ++i) { - char* addr_str; - GPR_ASSERT( - grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true) > 0); - target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str); - gpr_free(addr_str); - char* balancer_name = grpc_channel_arg_get_string(grpc_channel_args_find( - addresses[i].args(), GRPC_ARG_ADDRESS_BALANCER_NAME)); - target_authority_entries[i].value.reset(gpr_strdup(balancer_name)); - } - RefCountedPtr target_authority_table = - TargetAuthorityTable::Create(addresses.size(), target_authority_entries, - BalancerNameCmp); - gpr_free(target_authority_entries); - return target_authority_table; -} - -} // namespace -} // namespace grpc_core - grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( grpc_channel_args* args) { const char* args_to_remove[1]; size_t num_args_to_remove = 0; grpc_arg args_to_add[2]; size_t num_args_to_add = 0; - // Add arg for targets info table. - grpc_core::ServerAddressList* addresses = - grpc_core::FindServerAddressListChannelArg(args); - GPR_ASSERT(addresses != nullptr); - grpc_core::RefCountedPtr - target_authority_table = - grpc_core::CreateTargetAuthorityTable(*addresses); - args_to_add[num_args_to_add++] = - grpc_core::CreateTargetAuthorityTableChannelArg( - target_authority_table.get()); // Substitute the channel credentials with a version without call // credentials: the load balancer is not necessarily trusted to handle // bearer token credentials. diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 258339491c1..3489f3d491b 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -86,7 +86,14 @@ FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) { channel_args_ = grpc_channel_args_copy(args.args); FakeResolverResponseGenerator* response_generator = FakeResolverResponseGenerator::GetFromArgs(args.args); - if (response_generator != nullptr) response_generator->resolver_ = this; + if (response_generator != nullptr) { + response_generator->resolver_ = this; + if (response_generator->response_ != nullptr) { + response_generator->SetResponse(response_generator->response_); + grpc_channel_args_destroy(response_generator->response_); + response_generator->response_ = nullptr; + } + } } FakeResolver::~FakeResolver() { @@ -114,6 +121,9 @@ void FakeResolver::RequestReresolutionLocked() { void FakeResolver::MaybeFinishNextLocked() { if (next_completion_ != nullptr && (next_results_ != nullptr || return_failure_)) { + // When both next_results_ and channel_args_ contain an arg with the same + // name, only the one in next_results_ will be kept since next_results_ is + // before channel_args_. *target_result_ = return_failure_ ? nullptr : grpc_channel_args_union(next_results_, channel_args_); @@ -157,15 +167,19 @@ void FakeResolverResponseGenerator::SetResponseLocked(void* arg, void FakeResolverResponseGenerator::SetResponse(grpc_channel_args* response) { GPR_ASSERT(response != nullptr); - GPR_ASSERT(resolver_ != nullptr); - SetResponseClosureArg* closure_arg = New(); - closure_arg->generator = this; - closure_arg->response = grpc_channel_args_copy(response); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetResponseLocked, - closure_arg, - grpc_combiner_scheduler(resolver_->combiner())), - GRPC_ERROR_NONE); + if (resolver_ != nullptr) { + SetResponseClosureArg* closure_arg = New(); + closure_arg->generator = this; + closure_arg->response = grpc_channel_args_copy(response); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetResponseLocked, + closure_arg, + grpc_combiner_scheduler(resolver_->combiner())), + GRPC_ERROR_NONE); + } else { + GPR_ASSERT(response_ == nullptr); + response_ = grpc_channel_args_copy(response); + } } void FakeResolverResponseGenerator::SetReresolutionResponseLocked( diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index d86111c3829..f423e6d46db 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -44,7 +44,9 @@ class FakeResolverResponseGenerator FakeResolverResponseGenerator() {} // Instructs the fake resolver associated with the response generator - // instance to trigger a new resolution with the specified response. + // instance to trigger a new resolution with the specified response. If the + // resolver is not available yet, delays response setting until it is. This + // can be called at most once before the resolver is available. void SetResponse(grpc_channel_args* next_response); // Sets the re-resolution response, which is returned by the fake resolver @@ -79,6 +81,7 @@ class FakeResolverResponseGenerator static void SetFailureLocked(void* arg, grpc_error* error); FakeResolver* resolver_ = nullptr; // Do not own. + grpc_channel_args* response_ = nullptr; }; } // namespace grpc_core diff --git a/src/core/lib/security/security_connector/fake/fake_security_connector.cc b/src/core/lib/security/security_connector/fake/fake_security_connector.cc index a0e2e6f030b..c55fd34d0e2 100644 --- a/src/core/lib/security/security_connector/fake/fake_security_connector.cc +++ b/src/core/lib/security/security_connector/fake/fake_security_connector.cc @@ -26,6 +26,8 @@ #include #include +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" #include "src/core/ext/transport/chttp2/alpn/alpn.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" @@ -53,8 +55,11 @@ class grpc_fake_channel_security_connector final target_(gpr_strdup(target)), expected_targets_( gpr_strdup(grpc_fake_transport_get_expected_targets(args))), - is_lb_channel_(grpc_core::FindTargetAuthorityTableInArgs(args) != - nullptr) { + is_lb_channel_( + grpc_channel_args_find( + args, GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER) != nullptr || + grpc_channel_args_find( + args, GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER) != nullptr) { const grpc_arg* target_name_override_arg = grpc_channel_args_find(args, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG); if (target_name_override_arg != nullptr) { diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index d80fa33a83a..43dee177e7a 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -439,6 +439,28 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "xds_end2end_test", + srcs = ["xds_end2end_test.cc"], + external_deps = [ + "gmock", + "gtest", + ], + deps = [ + ":test_service_impl", + "//:gpr", + "//:grpc", + "//:grpc++", + "//:grpc_resolver_fake", + "//src/proto/grpc/lb/v1:load_balancer_proto", + "//src/proto/grpc/testing:echo_messages_proto", + "//src/proto/grpc/testing:echo_proto", + "//src/proto/grpc/testing/duplicate:echo_duplicate_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) + grpc_cc_test( name = "proto_server_reflection_test", srcs = ["proto_server_reflection_test.cc"], diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc new file mode 100644 index 00000000000..09556675d43 --- /dev/null +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -0,0 +1,1214 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/security/credentials/fake/fake_credentials.h" +#include "src/cpp/client/secure_credentials.h" +#include "src/cpp/server/secure_server_credentials.h" + +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" + +#include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" + +#include +#include + +// TODO(dgq): Other scenarios in need of testing: +// - Send a serverlist with faulty ip:port addresses (port > 2^16, etc). +// - Test reception of invalid serverlist +// - Test pinging +// - Test against a non-LB server. +// - Random LB server closing the stream unexpectedly. +// - Test using DNS-resolvable names (localhost?) +// - Test handling of creation of faulty RR instance by having the LB return a +// serverlist with non-existent backends after having initially returned a +// valid one. +// +// Findings from end to end testing to be covered here: +// - Handling of LB servers restart, including reconnection after backing-off +// retries. +// - Destruction of load balanced channel (and therefore of xds instance) +// while: +// 1) the internal LB call is still active. This should work by virtue +// of the weak reference the LB call holds. The call should be terminated as +// part of the xds shutdown process. +// 2) the retry timer is active. Again, the weak reference it holds should +// prevent a premature call to \a glb_destroy. +// - Restart of backend servers with no changes to serverlist. This exercises +// the RR handover mechanism. + +using std::chrono::system_clock; + +using grpc::lb::v1::LoadBalanceRequest; +using grpc::lb::v1::LoadBalanceResponse; +using grpc::lb::v1::LoadBalancer; + +namespace grpc { +namespace testing { +namespace { + +template +class CountedService : public ServiceType { + public: + size_t request_count() { + std::unique_lock lock(mu_); + return request_count_; + } + + size_t response_count() { + std::unique_lock lock(mu_); + return response_count_; + } + + void IncreaseResponseCount() { + std::unique_lock lock(mu_); + ++response_count_; + } + void IncreaseRequestCount() { + std::unique_lock lock(mu_); + ++request_count_; + } + + void ResetCounters() { + std::unique_lock lock(mu_); + request_count_ = 0; + response_count_ = 0; + } + + protected: + std::mutex mu_; + + private: + size_t request_count_ = 0; + size_t response_count_ = 0; +}; + +using BackendService = CountedService; +using BalancerService = CountedService; + +const char g_kCallCredsMdKey[] = "Balancer should not ..."; +const char g_kCallCredsMdValue[] = "... receive me"; + +class BackendServiceImpl : public BackendService { + public: + BackendServiceImpl() {} + + Status Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) override { + // Backend should receive the call credentials metadata. + auto call_credentials_entry = + context->client_metadata().find(g_kCallCredsMdKey); + EXPECT_NE(call_credentials_entry, context->client_metadata().end()); + if (call_credentials_entry != context->client_metadata().end()) { + EXPECT_EQ(call_credentials_entry->second, g_kCallCredsMdValue); + } + IncreaseRequestCount(); + const auto status = TestServiceImpl::Echo(context, request, response); + IncreaseResponseCount(); + AddClient(context->peer()); + return status; + } + + // Returns true on its first invocation, false otherwise. + bool Shutdown() { + std::unique_lock lock(mu_); + const bool prev = !shutdown_; + shutdown_ = true; + gpr_log(GPR_INFO, "Backend: shut down"); + return prev; + } + + std::set clients() { + std::unique_lock lock(clients_mu_); + return clients_; + } + + private: + void AddClient(const grpc::string& client) { + std::unique_lock lock(clients_mu_); + clients_.insert(client); + } + + std::mutex mu_; + bool shutdown_ = false; + std::mutex clients_mu_; + std::set clients_; +}; + +grpc::string Ip4ToPackedString(const char* ip_str) { + struct in_addr ip4; + GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1); + return grpc::string(reinterpret_cast(&ip4), sizeof(ip4)); +} + +struct ClientStats { + size_t num_calls_started = 0; + size_t num_calls_finished = 0; + size_t num_calls_finished_with_client_failed_to_send = 0; + size_t num_calls_finished_known_received = 0; + std::map drop_token_counts; + + ClientStats& operator+=(const ClientStats& other) { + num_calls_started += other.num_calls_started; + num_calls_finished += other.num_calls_finished; + num_calls_finished_with_client_failed_to_send += + other.num_calls_finished_with_client_failed_to_send; + num_calls_finished_known_received += + other.num_calls_finished_known_received; + for (const auto& p : other.drop_token_counts) { + drop_token_counts[p.first] += p.second; + } + return *this; + } +}; + +class BalancerServiceImpl : public BalancerService { + public: + using Stream = ServerReaderWriter; + using ResponseDelayPair = std::pair; + + explicit BalancerServiceImpl(int client_load_reporting_interval_seconds) + : client_load_reporting_interval_seconds_( + client_load_reporting_interval_seconds), + shutdown_(false) {} + + Status BalanceLoad(ServerContext* context, Stream* stream) override { + // TODO(juanlishen): Clean up the scoping. + gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); + { + std::unique_lock lock(mu_); + if (shutdown_) goto done; + } + + { + // Balancer shouldn't receive the call credentials metadata. + EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey), + context->client_metadata().end()); + LoadBalanceRequest request; + std::vector responses_and_delays; + + if (!stream->Read(&request)) { + goto done; + } + IncreaseRequestCount(); + gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this, + request.DebugString().c_str()); + + { + LoadBalanceResponse initial_response; + initial_response.mutable_initial_response() + ->mutable_client_stats_report_interval() + ->set_seconds(client_load_reporting_interval_seconds_); + stream->Write(initial_response); + } + + { + std::unique_lock lock(mu_); + responses_and_delays = responses_and_delays_; + } + for (const auto& response_and_delay : responses_and_delays) { + { + std::unique_lock lock(mu_); + if (shutdown_) goto done; + } + SendResponse(stream, response_and_delay.first, + response_and_delay.second); + } + { + std::unique_lock lock(mu_); + if (shutdown_) goto done; + serverlist_cond_.wait(lock, [this] { return serverlist_ready_; }); + } + + if (client_load_reporting_interval_seconds_ > 0) { + request.Clear(); + if (stream->Read(&request)) { + gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'", + this, request.DebugString().c_str()); + GPR_ASSERT(request.has_client_stats()); + // We need to acquire the lock here in order to prevent the notify_one + // below from firing before its corresponding wait is executed. + std::lock_guard lock(mu_); + client_stats_.num_calls_started += + request.client_stats().num_calls_started(); + client_stats_.num_calls_finished += + request.client_stats().num_calls_finished(); + client_stats_.num_calls_finished_with_client_failed_to_send += + request.client_stats() + .num_calls_finished_with_client_failed_to_send(); + client_stats_.num_calls_finished_known_received += + request.client_stats().num_calls_finished_known_received(); + for (const auto& drop_token_count : + request.client_stats().calls_finished_with_drop()) { + client_stats_ + .drop_token_counts[drop_token_count.load_balance_token()] += + drop_token_count.num_calls(); + } + load_report_ready_ = true; + load_report_cond_.notify_one(); + } + } + } + done: + gpr_log(GPR_INFO, "LB[%p]: done", this); + return Status::OK; + } + + void add_response(const LoadBalanceResponse& response, int send_after_ms) { + std::unique_lock lock(mu_); + responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); + } + + // Returns true on its first invocation, false otherwise. + bool Shutdown() { + bool prev; + { + std::unique_lock lock(mu_); + prev = !shutdown_; + shutdown_ = true; + } + NotifyDoneWithServerlists(); + gpr_log(GPR_INFO, "LB[%p]: shut down", this); + return prev; + } + + static LoadBalanceResponse BuildResponseForBackends( + const std::vector& backend_ports, + const std::map& drop_token_counts) { + LoadBalanceResponse response; + for (const auto& drop_token_count : drop_token_counts) { + for (size_t i = 0; i < drop_token_count.second; ++i) { + auto* server = response.mutable_server_list()->add_servers(); + server->set_drop(true); + server->set_load_balance_token(drop_token_count.first); + } + } + for (const int& backend_port : backend_ports) { + auto* server = response.mutable_server_list()->add_servers(); + server->set_ip_address(Ip4ToPackedString("127.0.0.1")); + server->set_port(backend_port); + static int token_count = 0; + char* token; + gpr_asprintf(&token, "token%03d", ++token_count); + server->set_load_balance_token(token); + gpr_free(token); + } + return response; + } + + const ClientStats& WaitForLoadReport() { + std::unique_lock lock(mu_); + load_report_cond_.wait(lock, [this] { return load_report_ready_; }); + load_report_ready_ = false; + return client_stats_; + } + + void NotifyDoneWithServerlists() { + std::lock_guard lock(mu_); + serverlist_ready_ = true; + serverlist_cond_.notify_all(); + } + + private: + void SendResponse(Stream* stream, const LoadBalanceResponse& response, + int delay_ms) { + gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms); + if (delay_ms > 0) { + gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms)); + } + gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this, + response.DebugString().c_str()); + IncreaseResponseCount(); + stream->Write(response); + } + + const int client_load_reporting_interval_seconds_; + std::vector responses_and_delays_; + std::mutex mu_; + std::condition_variable load_report_cond_; + bool load_report_ready_ = false; + std::condition_variable serverlist_cond_; + bool serverlist_ready_ = false; + ClientStats client_stats_; + bool shutdown_; +}; + +class XdsEnd2endTest : public ::testing::Test { + protected: + XdsEnd2endTest(int num_backends, int num_balancers, + int client_load_reporting_interval_seconds) + : server_host_("localhost"), + num_backends_(num_backends), + num_balancers_(num_balancers), + client_load_reporting_interval_seconds_( + client_load_reporting_interval_seconds) { + // Make the backup poller poll very frequently in order to pick up + // updates from all the subchannels's FDs. + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1"); + } + + void SetUp() override { + response_generator_ = + grpc_core::MakeRefCounted(); + lb_channel_response_generator_ = + grpc_core::MakeRefCounted(); + // Start the backends. + for (size_t i = 0; i < num_backends_; ++i) { + backends_.emplace_back(new BackendServiceImpl()); + backend_servers_.emplace_back(ServerThread( + "backend", server_host_, backends_.back().get())); + } + // Start the load balancers. + for (size_t i = 0; i < num_balancers_; ++i) { + balancers_.emplace_back( + new BalancerServiceImpl(client_load_reporting_interval_seconds_)); + balancer_servers_.emplace_back(ServerThread( + "balancer", server_host_, balancers_.back().get())); + } + ResetStub(); + } + + void TearDown() override { + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); + } + for (size_t i = 0; i < balancers_.size(); ++i) { + if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown(); + } + } + + void ResetStub(int fallback_timeout = 0, + const grpc::string& expected_targets = "") { + ChannelArguments args; + // TODO(juanlishen): Add setter to ChannelArguments. + args.SetInt(GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS, fallback_timeout); + args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, + response_generator_.get()); + if (!expected_targets.empty()) { + args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets); + } + std::ostringstream uri; + uri << "fake:///" << kApplicationTargetName_; + // TODO(dgq): templatize tests to run everything using both secure and + // insecure channel credentials. + grpc_channel_credentials* channel_creds = + grpc_fake_transport_security_credentials_create(); + grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create( + g_kCallCredsMdKey, g_kCallCredsMdValue, false); + std::shared_ptr creds( + new SecureChannelCredentials(grpc_composite_channel_credentials_create( + channel_creds, call_creds, nullptr))); + call_creds->Unref(); + channel_creds->Unref(); + channel_ = CreateCustomChannel(uri.str(), creds, args); + stub_ = grpc::testing::EchoTestService::NewStub(channel_); + } + + void ResetBackendCounters() { + for (const auto& backend : backends_) backend->ResetCounters(); + } + + ClientStats WaitForLoadReports() { + ClientStats client_stats; + for (const auto& balancer : balancers_) { + client_stats += balancer->WaitForLoadReport(); + } + return client_stats; + } + + bool SeenAllBackends() { + for (const auto& backend : backends_) { + if (backend->request_count() == 0) return false; + } + return true; + } + + void SendRpcAndCount(int* num_total, int* num_ok, int* num_failure, + int* num_drops) { + const Status status = SendRpc(); + if (status.ok()) { + ++*num_ok; + } else { + if (status.error_message() == "Call dropped by load balancing policy") { + ++*num_drops; + } else { + ++*num_failure; + } + } + ++*num_total; + } + + std::tuple WaitForAllBackends( + int num_requests_multiple_of = 1) { + int num_ok = 0; + int num_failure = 0; + int num_drops = 0; + int num_total = 0; + while (!SeenAllBackends()) { + SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops); + } + while (num_total % num_requests_multiple_of != 0) { + SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops); + } + ResetBackendCounters(); + gpr_log(GPR_INFO, + "Performed %d warm up requests (a multiple of %d) against the " + "backends. %d succeeded, %d failed, %d dropped.", + num_total, num_requests_multiple_of, num_ok, num_failure, + num_drops); + return std::make_tuple(num_ok, num_failure, num_drops); + } + + void WaitForBackend(size_t backend_idx) { + do { + (void)SendRpc(); + } while (backends_[backend_idx]->request_count() == 0); + ResetBackendCounters(); + } + + grpc_core::ServerAddressList CreateLbAddressesFromPortList( + const std::vector& ports) { + grpc_core::ServerAddressList addresses; + for (int port : ports) { + char* lb_uri_str; + gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port); + grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true); + GPR_ASSERT(lb_uri != nullptr); + grpc_resolved_address address; + GPR_ASSERT(grpc_parse_uri(lb_uri, &address)); + std::vector args_to_add; + grpc_channel_args* args = grpc_channel_args_copy_and_add( + nullptr, args_to_add.data(), args_to_add.size()); + addresses.emplace_back(address.addr, address.len, args); + grpc_uri_destroy(lb_uri); + gpr_free(lb_uri_str); + } + return addresses; + } + + void SetNextResolution(const std::vector& ports, + const char* service_config_json = nullptr, + grpc_core::FakeResolverResponseGenerator* + lb_channel_response_generator = nullptr) { + grpc_core::ExecCtx exec_ctx; + grpc_core::ServerAddressList addresses = + CreateLbAddressesFromPortList(ports); + std::vector args = { + CreateServerAddressListChannelArg(&addresses), + grpc_core::FakeResolverResponseGenerator::MakeChannelArg( + lb_channel_response_generator == nullptr + ? lb_channel_response_generator_.get() + : lb_channel_response_generator)}; + if (service_config_json != nullptr) { + args.push_back(grpc_channel_arg_string_create( + const_cast(GRPC_ARG_SERVICE_CONFIG), + const_cast(service_config_json))); + } + grpc_channel_args fake_result = {args.size(), args.data()}; + response_generator_->SetResponse(&fake_result); + } + + void SetNextResolutionForLbChannelAllBalancers( + const char* service_config_json = nullptr, + grpc_core::FakeResolverResponseGenerator* lb_channel_response_generator = + nullptr) { + std::vector ports; + for (size_t i = 0; i < balancer_servers_.size(); ++i) { + ports.emplace_back(balancer_servers_[i].port_); + } + SetNextResolutionForLbChannel(ports, service_config_json, + lb_channel_response_generator); + } + + void SetNextResolutionForLbChannel( + const std::vector& ports, const char* service_config_json = nullptr, + grpc_core::FakeResolverResponseGenerator* lb_channel_response_generator = + nullptr) { + grpc_core::ExecCtx exec_ctx; + grpc_core::ServerAddressList addresses = + CreateLbAddressesFromPortList(ports); + std::vector args = { + CreateServerAddressListChannelArg(&addresses), + }; + if (service_config_json != nullptr) { + args.push_back(grpc_channel_arg_string_create( + const_cast(GRPC_ARG_SERVICE_CONFIG), + const_cast(service_config_json))); + } + grpc_channel_args fake_result = {args.size(), args.data()}; + if (lb_channel_response_generator == nullptr) { + lb_channel_response_generator = lb_channel_response_generator_.get(); + } + lb_channel_response_generator->SetResponse(&fake_result); + } + + void SetNextReresolutionResponse(const std::vector& ports) { + grpc_core::ExecCtx exec_ctx; + grpc_core::ServerAddressList addresses = + CreateLbAddressesFromPortList(ports); + grpc_arg fake_addresses = CreateServerAddressListChannelArg(&addresses); + grpc_channel_args fake_result = {1, &fake_addresses}; + response_generator_->SetReresolutionResponse(&fake_result); + } + + const std::vector GetBackendPorts(const size_t start_index = 0) const { + std::vector backend_ports; + for (size_t i = start_index; i < backend_servers_.size(); ++i) { + backend_ports.push_back(backend_servers_[i].port_); + } + return backend_ports; + } + + void ScheduleResponseForBalancer(size_t i, + const LoadBalanceResponse& response, + int delay_ms) { + balancers_.at(i)->add_response(response, delay_ms); + } + + Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000, + bool wait_for_ready = false) { + const bool local_response = (response == nullptr); + if (local_response) response = new EchoResponse; + EchoRequest request; + request.set_message(kRequestMessage_); + ClientContext context; + context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); + if (wait_for_ready) context.set_wait_for_ready(true); + Status status = stub_->Echo(&context, request, response); + if (local_response) delete response; + return status; + } + + void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000, + bool wait_for_ready = false) { + for (size_t i = 0; i < times; ++i) { + EchoResponse response; + const Status status = SendRpc(&response, timeout_ms, wait_for_ready); + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); + } + } + + void CheckRpcSendFailure() { + const Status status = SendRpc(); + EXPECT_FALSE(status.ok()); + } + + template + struct ServerThread { + explicit ServerThread(const grpc::string& type, + const grpc::string& server_host, T* service) + : type_(type), service_(service) { + std::mutex mu; + // We need to acquire the lock here in order to prevent the notify_one + // by ServerThread::Start from firing before the wait below is hit. + std::unique_lock lock(mu); + port_ = grpc_pick_unused_port_or_die(); + gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_); + std::condition_variable cond; + thread_.reset(new std::thread( + std::bind(&ServerThread::Start, this, server_host, &mu, &cond))); + cond.wait(lock); + gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); + } + + void Start(const grpc::string& server_host, std::mutex* mu, + std::condition_variable* cond) { + // We need to acquire the lock here in order to prevent the notify_one + // below from firing before its corresponding wait is executed. + std::lock_guard lock(*mu); + std::ostringstream server_address; + server_address << server_host << ":" << port_; + ServerBuilder builder; + std::shared_ptr creds(new SecureServerCredentials( + grpc_fake_transport_security_server_credentials_create())); + builder.AddListeningPort(server_address.str(), creds); + builder.RegisterService(service_); + server_ = builder.BuildAndStart(); + cond->notify_one(); + } + + void Shutdown() { + gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str()); + server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); + thread_->join(); + gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str()); + } + + int port_; + grpc::string type_; + std::unique_ptr server_; + T* service_; + std::unique_ptr thread_; + }; + + const grpc::string server_host_; + const size_t num_backends_; + const size_t num_balancers_; + const int client_load_reporting_interval_seconds_; + std::shared_ptr channel_; + std::unique_ptr stub_; + std::vector> backends_; + std::vector> balancers_; + std::vector> backend_servers_; + std::vector> balancer_servers_; + grpc_core::RefCountedPtr + response_generator_; + grpc_core::RefCountedPtr + lb_channel_response_generator_; + const grpc::string kRequestMessage_ = "Live long and prosper."; + const grpc::string kApplicationTargetName_ = "application_target_name"; + const grpc::string kDefaultServiceConfig_ = + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"does_not_exist\":{} },\n" + " { \"xds_experimental\":{ \"balancerName\": \"fake:///lb\" } }\n" + " ]\n" + "}"; +}; + +class SingleBalancerTest : public XdsEnd2endTest { + public: + SingleBalancerTest() : XdsEnd2endTest(4, 1, 0) {} +}; + +TEST_F(SingleBalancerTest, Vanilla) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumRpcsPerAddress = 100; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0); + // Make sure that trying to connect works without a call. + channel_->GetState(true /* try_to_connect */); + // We need to wait for all backends to come online. + WaitForAllBackends(); + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(kNumRpcsPerAddress, + backend_servers_[i].service_->request_count()); + } + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + + // Check LB policy name for the channel. + EXPECT_EQ("xds_experimental", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + // Same backend listed twice. + std::vector ports; + ports.push_back(backend_servers_[0].port_); + ports.push_back(backend_servers_[0].port_); + const size_t kNumRpcsPerAddress = 10; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(ports, {}), 0); + // We need to wait for the backend to come online. + WaitForBackend(0); + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * ports.size()); + // Backend should have gotten 20 requests. + EXPECT_EQ(kNumRpcsPerAddress * 2, + backend_servers_[0].service_->request_count()); + // And they should have come from a single client port, because of + // subchannel sharing. + EXPECT_EQ(1UL, backends_[0]->clients().size()); + balancers_[0]->NotifyDoneWithServerlists(); +} + +TEST_F(SingleBalancerTest, SecureNaming) { + // TODO(juanlishen): Use separate fake creds for the balancer channel. + ResetStub(0, kApplicationTargetName_ + ";lb"); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannel({balancer_servers_[0].port_}); + const size_t kNumRpcsPerAddress = 100; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0); + // Make sure that trying to connect works without a call. + channel_->GetState(true /* try_to_connect */); + // We need to wait for all backends to come online. + WaitForAllBackends(); + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); + + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(kNumRpcsPerAddress, + backend_servers_[i].service_->request_count()); + } + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); +} + +TEST_F(SingleBalancerTest, SecureNamingDeathTest) { + ::testing::FLAGS_gtest_death_test_style = "threadsafe"; + // Make sure that we blow up (via abort() from the security connector) when + // the name from the balancer doesn't match expectations. + ASSERT_DEATH( + { + ResetStub(0, kApplicationTargetName_ + ";lb"); + SetNextResolution({}, + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"does_not_exist\":{} },\n" + " { \"xds_experimental\":{ \"balancerName\": " + "\"fake:///wrong_lb\" } }\n" + " ]\n" + "}"); + SetNextResolutionForLbChannel({balancer_servers_[0].port_}); + channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1)); + }, + ""); +} + +TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); + const int kCallDeadlineMs = kServerlistDelayMs * 2; + // First response is an empty serverlist, sent right away. + ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0); + // Send non-empty serverlist only after kServerlistDelayMs + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + kServerlistDelayMs); + const auto t0 = system_clock::now(); + // Client will block: LB will initially send empty serverlist. + CheckRpcSendOk(1, kCallDeadlineMs, true /* wait_for_ready */); + const auto ellapsed_ms = + std::chrono::duration_cast( + system_clock::now() - t0); + // but eventually, the LB sends a serverlist update that allows the call to + // proceed. The call delay must be larger than the delay in sending the + // populated serverlist but under the call's deadline (which is enforced by + // the call's deadline). + EXPECT_GT(ellapsed_ms.count(), kServerlistDelayMs); + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent two responses. + EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); +} + +TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumUnreachableServers = 5; + std::vector ports; + for (size_t i = 0; i < kNumUnreachableServers; ++i) { + ports.push_back(grpc_pick_unused_port_or_die()); + } + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(ports, {}), 0); + const Status status = SendRpc(); + // The error shouldn't be DEADLINE_EXCEEDED. + EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); +} + +// The fallback tests are deferred because the fallback mode hasn't been +// supported yet. + +// TODO(juanlishen): Add TEST_F(SingleBalancerTest, Fallback) + +// TODO(juanlishen): Add TEST_F(SingleBalancerTest, FallbackUpdate) + +TEST_F(SingleBalancerTest, BackendsRestart) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumRpcsPerAddress = 100; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0); + // Make sure that trying to connect works without a call. + channel_->GetState(true /* try_to_connect */); + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); + } + CheckRpcSendFailure(); + for (size_t i = 0; i < num_backends_; ++i) { + backends_.emplace_back(new BackendServiceImpl()); + backend_servers_.emplace_back(ServerThread( + "backend", server_host_, backends_.back().get())); + } + // The following RPC will fail due to the backend ports having changed. It + // will nonetheless exercise the xds-roundrobin handling of the RR policy + // having gone into shutdown. + // TODO(dgq): implement the "backend restart" component as well. We need extra + // machinery to either update the LB responses "on the fly" or instruct + // backends which ports to restart on. + CheckRpcSendFailure(); +} + +class UpdatesTest : public XdsEnd2endTest { + public: + UpdatesTest() : XdsEnd2endTest(4, 3, 0) {} +}; + +TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const std::vector first_backend{GetBackendPorts()[0]}; + const std::vector second_backend{GetBackendPorts()[1]}; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); + + // Wait until the first backend is ready. + WaitForBackend(0); + + // Send 10 requests. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); + SetNextResolutionForLbChannel({balancer_servers_[1].port_}); + gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); + + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN)); + // Send 10 seconds worth of RPCs + do { + CheckRpcSendOk(); + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + // The current LB call is still working, so xds continued using it to the + // first balancer, which doesn't assign the second backend. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); +} + +TEST_F(UpdatesTest, UpdateBalancerName) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const std::vector first_backend{GetBackendPorts()[0]}; + const std::vector second_backend{GetBackendPorts()[1]}; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); + + // Wait until the first backend is ready. + WaitForBackend(0); + + // Send 10 requests. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + std::vector ports; + ports.emplace_back(balancer_servers_[1].port_); + auto new_lb_channel_response_generator = + grpc_core::MakeRefCounted(); + SetNextResolutionForLbChannel(ports, nullptr, + new_lb_channel_response_generator.get()); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE BALANCER NAME =========="); + SetNextResolution({}, + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"does_not_exist\":{} },\n" + " { \"xds_experimental\":{ \"balancerName\": " + "\"fake:///updated_lb\" } }\n" + " ]\n" + "}", + new_lb_channel_response_generator.get()); + gpr_log(GPR_INFO, "========= UPDATED BALANCER NAME =========="); + + // Wait until update has been processed, as signaled by the second backend + // receiving a request. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + WaitForBackend(1); + + backend_servers_[1].service_->ResetCounters(); + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + // All 10 requests should have gone to the second backend. + EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); + + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); +} + +// Send an update with the same set of LBs as the one in SetUp() in order to +// verify that the LB channel inside xds keeps the initial connection (which +// by definition is also present in the update). +TEST_F(UpdatesTest, UpdateBalancersRepeated) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const std::vector first_backend{GetBackendPorts()[0]}; + const std::vector second_backend{GetBackendPorts()[0]}; + + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); + + // Wait until the first backend is ready. + WaitForBackend(0); + + // Send 10 requests. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + std::vector ports; + ports.emplace_back(balancer_servers_[0].port_); + ports.emplace_back(balancer_servers_[1].port_); + ports.emplace_back(balancer_servers_[2].port_); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); + SetNextResolutionForLbChannel(ports); + gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); + + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(10000, GPR_TIMESPAN)); + // Send 10 seconds worth of RPCs + do { + CheckRpcSendOk(); + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + // xds continued using the original LB call to the first balancer, which + // doesn't assign the second backend. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + + ports.clear(); + ports.emplace_back(balancer_servers_[0].port_); + ports.emplace_back(balancer_servers_[1].port_); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 2 =========="); + SetNextResolutionForLbChannel(ports); + gpr_log(GPR_INFO, "========= UPDATE 2 DONE =========="); + + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(10000, GPR_TIMESPAN)); + // Send 10 seconds worth of RPCs + do { + CheckRpcSendOk(); + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + // xds continued using the original LB call to the first balancer, which + // doesn't assign the second backend. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); +} + +TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannel({balancer_servers_[0].port_}); + const std::vector first_backend{GetBackendPorts()[0]}; + const std::vector second_backend{GetBackendPorts()[1]}; + + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); + ScheduleResponseForBalancer( + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); + + // Start servers and send 10 RPCs per server. + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + // All 10 requests should have gone to the first backend. + EXPECT_EQ(10U, backend_servers_[0].service_->request_count()); + + // Kill balancer 0 + gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************"); + if (balancers_[0]->Shutdown()) balancer_servers_[0].Shutdown(); + gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************"); + + // This is serviced by the existing child policy. + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + // All 10 requests should again have gone to the first backend. + EXPECT_EQ(20U, backend_servers_[0].service_->request_count()); + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + + // Balancer 0 got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[1].service_->response_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); + + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); + SetNextResolutionForLbChannel({balancer_servers_[1].port_}); + gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); + + // Wait until update has been processed, as signaled by the second backend + // receiving a request. In the meantime, the client continues to be serviced + // (by the first backend) without interruption. + EXPECT_EQ(0U, backend_servers_[1].service_->request_count()); + WaitForBackend(1); + + // This is serviced by the updated RR policy + backend_servers_[1].service_->ResetCounters(); + gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); + CheckRpcSendOk(10); + gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH =========="); + // All 10 requests should have gone to the second backend. + EXPECT_EQ(10U, backend_servers_[1].service_->request_count()); + + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + // The second balancer, published as part of the first update, may end up + // getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer + // firing races with the arrival of the update containing the second + // balancer. + EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U); + EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U); + EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U); + EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U); + EXPECT_EQ(0U, balancer_servers_[2].service_->request_count()); + EXPECT_EQ(0U, balancer_servers_[2].service_->response_count()); +} + +// The re-resolution tests are deferred because they rely on the fallback mode, +// which hasn't been supported. + +// TODO(juanlishen): Add TEST_F(UpdatesTest, ReresolveDeadBackend). + +// TODO(juanlishen): Add TEST_F(UpdatesWithClientLoadReportingTest, +// ReresolveDeadBalancer) + +// The drop tests are deferred because the drop handling hasn't been added yet. + +// TODO(roth): Add TEST_F(SingleBalancerTest, Drop) + +// TODO(roth): Add TEST_F(SingleBalancerTest, DropAllFirst) + +// TODO(roth): Add TEST_F(SingleBalancerTest, DropAll) + +class SingleBalancerWithClientLoadReportingTest : public XdsEnd2endTest { + public: + SingleBalancerWithClientLoadReportingTest() : XdsEnd2endTest(4, 1, 3) {} +}; + +// The client load reporting tests are deferred because the client load +// reporting hasn't been supported yet. + +// TODO(vpowar): Add TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) + +// TODO(roth): Add TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_init(); + grpc::testing::TestEnvironment env(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + const auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 0b84b8a4b95..5a1eafda6c2 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -5002,6 +5002,28 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "grpc", + "grpc++", + "grpc++_test_util", + "grpc_test_util" + ], + "headers": [ + "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h", + "src/proto/grpc/lb/v1/load_balancer.pb.h", + "src/proto/grpc/lb/v1/load_balancer_mock.grpc.pb.h" + ], + "is_filegroup": false, + "language": "c++", + "name": "xds_end2end_test", + "src": [ + "test/cpp/end2end/xds_end2end_test.cc" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 6dc9eb7f0d1..9399b9f6b9d 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -5686,6 +5686,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "xds_end2end_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,