Use real resolver in xds lb channel

pull/18021/head
Juanli Shen 6 years ago
parent a2f1e924de
commit e889fda482
  1. 48
      CMakeLists.txt
  2. 52
      Makefile
  3. 13
      build.yaml
  4. 4
      include/grpc/impl/codegen/grpc_types.h
  5. 4
      src/core/ext/filters/client_channel/lb_policy.h
  6. 716
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  7. 43
      src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc
  8. 34
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  9. 5
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  10. 9
      src/core/lib/security/security_connector/fake/fake_security_connector.cc
  11. 22
      test/cpp/end2end/BUILD
  12. 1214
      test/cpp/end2end/xds_end2end_test.cc
  13. 22
      tools/run_tests/generated/sources_and_headers.json
  14. 24
      tools/run_tests/generated/tests.json

@ -720,6 +720,7 @@ add_dependencies(buildtests_cxx transport_security_common_api_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx writes_per_rpc_test)
endif()
add_dependencies(buildtests_cxx xds_end2end_test)
add_dependencies(buildtests_cxx resolver_component_test_unsecure)
add_dependencies(buildtests_cxx resolver_component_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -16232,6 +16233,53 @@ endif()
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(xds_end2end_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.grpc.pb.h
test/cpp/end2end/xds_end2end_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
protobuf_generate_grpc_cpp(
src/proto/grpc/lb/v1/load_balancer.proto
)
target_include_directories(xds_end2end_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(xds_end2end_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
grpc_test_util
grpc++
grpc
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(public_headers_must_be_c89
test/core/surface/public_headers_must_be_c89.c
)

@ -1276,6 +1276,7 @@ time_change_test: $(BINDIR)/$(CONFIG)/time_change_test
transport_pid_controller_test: $(BINDIR)/$(CONFIG)/transport_pid_controller_test
transport_security_common_api_test: $(BINDIR)/$(CONFIG)/transport_security_common_api_test
writes_per_rpc_test: $(BINDIR)/$(CONFIG)/writes_per_rpc_test
xds_end2end_test: $(BINDIR)/$(CONFIG)/xds_end2end_test
public_headers_must_be_c89: $(BINDIR)/$(CONFIG)/public_headers_must_be_c89
gen_hpack_tables: $(BINDIR)/$(CONFIG)/gen_hpack_tables
gen_legal_metadata_characters: $(BINDIR)/$(CONFIG)/gen_legal_metadata_characters
@ -1787,6 +1788,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/transport_security_common_api_test \
$(BINDIR)/$(CONFIG)/writes_per_rpc_test \
$(BINDIR)/$(CONFIG)/xds_end2end_test \
$(BINDIR)/$(CONFIG)/boringssl_crypto_test_data \
$(BINDIR)/$(CONFIG)/boringssl_asn1_test \
$(BINDIR)/$(CONFIG)/boringssl_base64_test \
@ -1976,6 +1978,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/transport_security_common_api_test \
$(BINDIR)/$(CONFIG)/writes_per_rpc_test \
$(BINDIR)/$(CONFIG)/xds_end2end_test \
$(BINDIR)/$(CONFIG)/resolver_component_test_unsecure \
$(BINDIR)/$(CONFIG)/resolver_component_test \
$(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker_unsecure \
@ -2500,6 +2503,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/transport_security_common_api_test || ( echo test transport_security_common_api_test failed ; exit 1 )
$(E) "[RUN] Testing writes_per_rpc_test"
$(Q) $(BINDIR)/$(CONFIG)/writes_per_rpc_test || ( echo test writes_per_rpc_test failed ; exit 1 )
$(E) "[RUN] Testing xds_end2end_test"
$(Q) $(BINDIR)/$(CONFIG)/xds_end2end_test || ( echo test xds_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing resolver_component_tests_runner_invoker_unsecure"
$(Q) $(BINDIR)/$(CONFIG)/resolver_component_tests_runner_invoker_unsecure || ( echo test resolver_component_tests_runner_invoker_unsecure failed ; exit 1 )
$(E) "[RUN] Testing resolver_component_tests_runner_invoker"
@ -21308,6 +21313,53 @@ endif
endif
XDS_END2END_TEST_SRC = \
$(GENDIR)/src/proto/grpc/lb/v1/load_balancer.pb.cc $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc \
test/cpp/end2end/xds_end2end_test.cc \
XDS_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(XDS_END2END_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/xds_end2end_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/xds_end2end_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/xds_end2end_test: $(PROTOBUF_DEP) $(XDS_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(XDS_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/xds_end2end_test
endif
endif
$(OBJDIR)/$(CONFIG)/src/proto/grpc/lb/v1/load_balancer.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/xds_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_xds_end2end_test: $(XDS_END2END_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(XDS_END2END_TEST_OBJS:.o=.dep)
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/xds_end2end_test.o: $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.pb.cc $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc
PUBLIC_HEADERS_MUST_BE_C89_SRC = \
test/core/surface/public_headers_must_be_c89.c \

@ -5644,6 +5644,19 @@ targets:
- mac
- linux
- posix
- name: xds_end2end_test
gtest: true
build: test
language: c++
src:
- src/proto/grpc/lb/v1/load_balancer.proto
- test/cpp/end2end/xds_end2end_test.cc
deps:
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr
- name: public_headers_must_be_c89
build: test
language: c89

@ -317,6 +317,10 @@ typedef struct {
balancer before using fallback backend addresses from the resolver.
If 0, fallback will never be used. Default value is 10000. */
#define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms"
/* Timeout in milliseconds to wait for the serverlist from the xDS load
balancer before using fallback backend addresses from the resolver.
If 0, fallback will never be used. Default value is 10000. */
#define GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS "grpc.xds_fallback_timeout_ms"
/** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \
"grpc.workaround.cronet_compression"

@ -297,8 +297,8 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
grpc_combiner* combiner() const { return combiner_; }
// Note: LB policies MUST NOT call any method on the helper from
// their constructor.
// Note: LB policies MUST NOT call any method on the helper from their
// constructor.
// Note: This will return null after ShutdownLocked() has been called.
ChannelControlHelper* channel_control_helper() const {
return channel_control_helper_.get();

@ -129,78 +129,128 @@ class XdsLb : public LoadBalancingPolicy {
channelz::ChildRefsList* child_channels) override;
private:
/// Contains a call to the LB server and all the data related to the call.
class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
/// Contains a channel to the LB server and all the data related to the
/// channel.
class BalancerChannelState
: public InternallyRefCounted<BalancerChannelState> {
public:
explicit BalancerCallState(
RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy);
/// Contains a call to the LB server and all the data related to the call.
class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
public:
explicit BalancerCallState(RefCountedPtr<BalancerChannelState> lb_chand);
// It's the caller's responsibility to ensure that Orphan() is called from
// inside the combiner.
void Orphan() override;
void StartQuery();
XdsLbClientStats* client_stats() const { return client_stats_.get(); }
// It's the caller's responsibility to ensure that Orphan() is called from
// inside the combiner.
void Orphan() override;
bool seen_initial_response() const { return seen_initial_response_; }
private:
// So Delete() can access our private dtor.
template <typename T>
friend void grpc_core::Delete(T*);
void StartQuery();
~BalancerCallState();
RefCountedPtr<XdsLbClientStats> client_stats() const {
return client_stats_;
}
XdsLb* xdslb_policy() const {
return static_cast<XdsLb*>(xdslb_policy_.get());
}
bool seen_initial_response() const { return seen_initial_response_; }
void ScheduleNextClientLoadReportLocked();
void SendClientLoadReportLocked();
private:
// So Delete() can access our private dtor.
template <typename T>
friend void grpc_core::Delete(T*);
static bool LoadReportCountersAreZero(xds_grpclb_request* request);
~BalancerCallState();
static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
XdsLb* xdslb_policy() const { return lb_chand_->xdslb_policy_.get(); }
// The owning LB policy.
RefCountedPtr<LoadBalancingPolicy> xdslb_policy_;
bool IsCurrentCallOnChannel() const {
return this == lb_chand_->lb_calld_.get();
}
// The streaming call to the LB server. Always non-NULL.
grpc_call* lb_call_ = nullptr;
void ScheduleNextClientLoadReportLocked();
void SendClientLoadReportLocked();
static bool LoadReportCountersAreZero(xds_grpclb_request* request);
static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
// The owning LB channel.
RefCountedPtr<BalancerChannelState> lb_chand_;
// The streaming call to the LB server. Always non-NULL.
grpc_call* lb_call_ = nullptr;
// recv_initial_metadata
grpc_metadata_array lb_initial_metadata_recv_;
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
grpc_closure lb_on_initial_request_sent_;
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure lb_on_balancer_message_received_;
bool seen_initial_response_ = false;
// recv_trailing_metadata
grpc_closure lb_on_balancer_status_received_;
grpc_metadata_array lb_trailing_metadata_recv_;
grpc_status_code lb_call_status_;
grpc_slice lb_call_status_details_;
// The stats for client-side load reporting associated with this LB call.
// Created after the first serverlist is received.
RefCountedPtr<XdsLbClientStats> client_stats_;
grpc_millis client_stats_report_interval_ = 0;
grpc_timer client_load_report_timer_;
bool client_load_report_timer_callback_pending_ = false;
bool last_client_load_report_counters_were_zero_ = false;
bool client_load_report_is_due_ = false;
// The closure used for either the load report timer or the callback for
// completion of sending the load report.
grpc_closure client_load_report_closure_;
};
BalancerChannelState(const char* balancer_name,
const grpc_channel_args& args,
RefCountedPtr<XdsLb> parent_xdslb_policy);
~BalancerChannelState();
// recv_initial_metadata
grpc_metadata_array lb_initial_metadata_recv_;
void Orphan() override;
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
grpc_closure lb_on_initial_request_sent_;
grpc_channel* channel() const { return channel_; }
BalancerCallState* lb_calld() const { return lb_calld_.get(); }
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure lb_on_balancer_message_received_;
bool seen_initial_response_ = false;
bool IsCurrentChannel() const {
return this == xdslb_policy_->lb_chand_.get();
}
bool IsPendingChannel() const {
return this == xdslb_policy_->pending_lb_chand_.get();
}
bool HasActiveCall() const { return lb_calld_ != nullptr; }
// recv_trailing_metadata
grpc_closure lb_on_balancer_status_received_;
grpc_metadata_array lb_trailing_metadata_recv_;
grpc_status_code lb_call_status_;
grpc_slice lb_call_status_details_;
void StartCallRetryTimerLocked();
static void OnCallRetryTimerLocked(void* arg, grpc_error* error);
void StartCallLocked();
// The stats for client-side load reporting associated with this LB call.
// Created after the first serverlist is received.
RefCountedPtr<XdsLbClientStats> client_stats_;
grpc_millis client_stats_report_interval_ = 0;
grpc_timer client_load_report_timer_;
bool client_load_report_timer_callback_pending_ = false;
bool last_client_load_report_counters_were_zero_ = false;
bool client_load_report_is_due_ = false;
// The closure used for either the load report timer or the callback for
// completion of sending the load report.
grpc_closure client_load_report_closure_;
private:
// The owning LB policy.
RefCountedPtr<XdsLb> xdslb_policy_;
// The channel and its status.
grpc_channel* channel_;
bool shutting_down_ = false;
// The data associated with the current LB call. It holds a ref to this LB
// channel. It's instantiated every time we query for backends. It's reset
// whenever the current LB call is no longer needed (e.g., the LB policy is
// shutting down, or the LB call has ended). A non-NULL lb_calld_ always
// contains a non-NULL lb_call_.
OrphanablePtr<BalancerCallState> lb_calld_;
BackOff lb_call_backoff_;
grpc_timer lb_call_retry_timer_;
grpc_closure lb_on_call_retry_;
bool retry_timer_callback_pending_ = false;
};
class Picker : public SubchannelPicker {
@ -245,13 +295,13 @@ class XdsLb : public LoadBalancingPolicy {
// found. Does nothing upon failure.
void ParseLbConfig(Config* xds_config);
// Methods for dealing with the balancer channel and call.
void StartBalancerCallLocked();
BalancerChannelState* LatestLbChannel() const {
return pending_lb_chand_ != nullptr ? pending_lb_chand_.get()
: lb_chand_.get();
}
// Callback to enter fallback mode.
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
void StartBalancerCallRetryTimerLocked();
static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
static void OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* error);
// Methods for dealing with the child policy.
void CreateOrUpdateChildPolicyLocked();
@ -271,30 +321,15 @@ class XdsLb : public LoadBalancingPolicy {
bool shutting_down_ = false;
// The channel for communicating with the LB server.
grpc_channel* lb_channel_ = nullptr;
OrphanablePtr<BalancerChannelState> lb_chand_;
OrphanablePtr<BalancerChannelState> pending_lb_chand_;
// Mutex to protect the channel to the LB server. This is used when
// processing a channelz request.
gpr_mu lb_channel_mu_;
grpc_connectivity_state lb_channel_connectivity_;
grpc_closure lb_channel_on_connectivity_changed_;
// Are we already watching the LB channel's connectivity?
bool watching_lb_channel_ = false;
// Response generator to inject address updates into lb_channel_.
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
// The data associated with the current LB call. It holds a ref to this LB
// policy. It's initialized every time we query for backends. It's reset to
// NULL whenever the current LB call is no longer needed (e.g., the LB policy
// is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
// contains a non-NULL lb_call_.
OrphanablePtr<BalancerCallState> lb_calld_;
// TODO(juanlishen): Replace this with atomic.
gpr_mu lb_chand_mu_;
// Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0;
// Balancer call retry state.
BackOff lb_call_backoff_;
bool retry_timer_callback_pending_ = false;
grpc_timer lb_call_retry_timer_;
grpc_closure lb_on_call_retry_;
// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
@ -360,11 +395,11 @@ void XdsLb::Helper::UpdateState(grpc_connectivity_state state,
// TODO(juanlishen): When in fallback mode, pass the child picker
// through without wrapping it. (Or maybe use a different helper for
// the fallback policy?)
RefCountedPtr<XdsLbClientStats> client_stats;
if (parent_->lb_calld_ != nullptr &&
parent_->lb_calld_->client_stats() != nullptr) {
client_stats = parent_->lb_calld_->client_stats()->Ref();
}
GPR_ASSERT(parent_->lb_chand_ != nullptr);
RefCountedPtr<XdsLbClientStats> client_stats =
parent_->lb_chand_->lb_calld() == nullptr
? nullptr
: parent_->lb_chand_->lb_calld()->client_stats();
parent_->channel_control_helper()->UpdateState(
state, state_error,
UniquePtr<SubchannelPicker>(
@ -379,12 +414,13 @@ void XdsLb::Helper::RequestReresolution() {
"(%p).",
parent_.get(), parent_->child_policy_.get());
}
GPR_ASSERT(parent_->lb_chand_ != nullptr);
// If we are talking to a balancer, we expect to get updated addresses
// from the balancer, so we can ignore the re-resolution request from
// the RR policy. Otherwise, pass the re-resolution request up to the
// the child policy. Otherwise, pass the re-resolution request up to the
// channel.
if (parent_->lb_calld_ == nullptr ||
!parent_->lb_calld_->seen_initial_response()) {
if (parent_->lb_chand_->lb_calld() == nullptr ||
!parent_->lb_chand_->lb_calld()->seen_initial_response()) {
parent_->channel_control_helper()->RequestReresolution();
}
}
@ -465,14 +501,98 @@ UniquePtr<ServerAddressList> ProcessServerlist(
}
//
// XdsLb::BalancerCallState
// XdsLb::BalancerChannelState
//
XdsLb::BalancerCallState::BalancerCallState(
RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy)
XdsLb::BalancerChannelState::BalancerChannelState(
const char* balancer_name, const grpc_channel_args& args,
grpc_core::RefCountedPtr<grpc_core::XdsLb> parent_xdslb_policy)
: InternallyRefCounted<BalancerChannelState>(&grpc_lb_xds_trace),
xdslb_policy_(std::move(parent_xdslb_policy)),
lb_call_backoff_(
BackOff::Options()
.set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
1000)
.set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_XDS_RECONNECT_JITTER)
.set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
channel_ = xdslb_policy_->channel_control_helper()->CreateChannel(
balancer_name, args);
GPR_ASSERT(channel_ != nullptr);
StartCallLocked();
}
XdsLb::BalancerChannelState::~BalancerChannelState() {
grpc_channel_destroy(channel_);
}
void XdsLb::BalancerChannelState::Orphan() {
shutting_down_ = true;
lb_calld_.reset();
if (retry_timer_callback_pending_) grpc_timer_cancel(&lb_call_retry_timer_);
Unref(DEBUG_LOCATION, "lb_channel_orphaned");
}
void XdsLb::BalancerChannelState::StartCallRetryTimerLocked() {
grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Failed to connect to LB server (lb_chand: %p)...",
xdslb_policy_.get(), this);
grpc_millis timeout = next_try - ExecCtx::Get()->Now();
if (timeout > 0) {
gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active in %" PRId64 "ms.",
xdslb_policy_.get(), timeout);
} else {
gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active immediately.",
xdslb_policy_.get());
}
}
Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer").release();
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &OnCallRetryTimerLocked, this,
grpc_combiner_scheduler(xdslb_policy_->combiner()));
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
retry_timer_callback_pending_ = true;
}
void XdsLb::BalancerChannelState::OnCallRetryTimerLocked(void* arg,
grpc_error* error) {
BalancerChannelState* lb_chand = static_cast<BalancerChannelState*>(arg);
lb_chand->retry_timer_callback_pending_ = false;
if (!lb_chand->shutting_down_ && error == GRPC_ERROR_NONE &&
lb_chand->lb_calld_ == nullptr) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Restarting call to LB server (lb_chand: %p)",
lb_chand->xdslb_policy_.get(), lb_chand);
}
lb_chand->StartCallLocked();
}
lb_chand->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
}
void XdsLb::BalancerChannelState::StartCallLocked() {
if (shutting_down_) return;
GPR_ASSERT(channel_ != nullptr);
GPR_ASSERT(lb_calld_ == nullptr);
lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Query for backends (lb_chand: %p, lb_calld: %p)",
xdslb_policy_.get(), this, lb_calld_.get());
}
lb_calld_->StartQuery();
}
//
// XdsLb::BalancerChannelState::BalancerCallState
//
XdsLb::BalancerChannelState::BalancerCallState::BalancerCallState(
RefCountedPtr<BalancerChannelState> lb_chand)
: InternallyRefCounted<BalancerCallState>(&grpc_lb_xds_trace),
xdslb_policy_(std::move(parent_xdslb_policy)) {
GPR_ASSERT(xdslb_policy_ != nullptr);
lb_chand_(std::move(lb_chand)) {
GPR_ASSERT(xdslb_policy() != nullptr);
GPR_ASSERT(!xdslb_policy()->shutting_down_);
// Init the LB call. Note that the LB call will progress every time there's
// activity in xdslb_policy_->interested_parties(), which is comprised of
@ -484,8 +604,8 @@ XdsLb::BalancerCallState::BalancerCallState(
? GRPC_MILLIS_INF_FUTURE
: ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_;
lb_call_ = grpc_channel_create_pollset_set_call(
xdslb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
xdslb_policy_->interested_parties(),
lb_chand_->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
xdslb_policy()->interested_parties(),
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
nullptr, deadline, nullptr);
// Init the LB call request payload.
@ -509,7 +629,7 @@ XdsLb::BalancerCallState::BalancerCallState(
grpc_combiner_scheduler(xdslb_policy()->combiner()));
}
XdsLb::BalancerCallState::~BalancerCallState() {
XdsLb::BalancerChannelState::BalancerCallState::~BalancerCallState() {
GPR_ASSERT(lb_call_ != nullptr);
grpc_call_unref(lb_call_);
grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
@ -519,7 +639,7 @@ XdsLb::BalancerCallState::~BalancerCallState() {
grpc_slice_unref_internal(lb_call_status_details_);
}
void XdsLb::BalancerCallState::Orphan() {
void XdsLb::BalancerChannelState::BalancerCallState::Orphan() {
GPR_ASSERT(lb_call_ != nullptr);
// If we are here because xdslb_policy wants to cancel the call,
// lb_on_balancer_status_received_ will complete the cancellation and clean
@ -534,11 +654,11 @@ void XdsLb::BalancerCallState::Orphan() {
// in lb_on_balancer_status_received_ instead of here.
}
void XdsLb::BalancerCallState::StartQuery() {
void XdsLb::BalancerChannelState::BalancerCallState::StartQuery() {
GPR_ASSERT(lb_call_ != nullptr);
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
xdslb_policy_.get(), this, lb_call_);
xdslb_policy(), this, lb_call_);
}
// Create the ops.
grpc_call_error call_error;
@ -606,7 +726,8 @@ void XdsLb::BalancerCallState::StartQuery() {
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
void XdsLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
void XdsLb::BalancerChannelState::BalancerCallState::
ScheduleNextClientLoadReportLocked() {
const grpc_millis next_client_load_report_time =
ExecCtx::Get()->Now() + client_stats_report_interval_;
GRPC_CLOSURE_INIT(&client_load_report_closure_,
@ -617,12 +738,11 @@ void XdsLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
client_load_report_timer_callback_pending_ = true;
}
void XdsLb::BalancerCallState::MaybeSendClientLoadReportLocked(
void* arg, grpc_error* error) {
void XdsLb::BalancerChannelState::BalancerCallState::
MaybeSendClientLoadReportLocked(void* arg, grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
XdsLb* xdslb_policy = lb_calld->xdslb_policy();
lb_calld->client_load_report_timer_callback_pending_ = false;
if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) {
if (error != GRPC_ERROR_NONE || !lb_calld->IsCurrentCallOnChannel()) {
lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
return;
}
@ -636,7 +756,7 @@ void XdsLb::BalancerCallState::MaybeSendClientLoadReportLocked(
}
}
bool XdsLb::BalancerCallState::LoadReportCountersAreZero(
bool XdsLb::BalancerChannelState::BalancerCallState::LoadReportCountersAreZero(
xds_grpclb_request* request) {
XdsLbClientStats::DroppedCallCounts* drop_entries =
static_cast<XdsLbClientStats::DroppedCallCounts*>(
@ -650,7 +770,8 @@ bool XdsLb::BalancerCallState::LoadReportCountersAreZero(
}
// TODO(vpowar): Use LRS to send the client Load Report.
void XdsLb::BalancerCallState::SendClientLoadReportLocked() {
void XdsLb::BalancerChannelState::BalancerCallState::
SendClientLoadReportLocked() {
// Construct message payload.
GPR_ASSERT(send_message_payload_ == nullptr);
xds_grpclb_request* request =
@ -671,27 +792,27 @@ void XdsLb::BalancerCallState::SendClientLoadReportLocked() {
xds_grpclb_request_destroy(request);
}
void XdsLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
grpc_error* error) {
void XdsLb::BalancerChannelState::BalancerCallState::OnInitialRequestSentLocked(
void* arg, grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
lb_calld->send_message_payload_ = nullptr;
// If we attempted to send a client load report before the initial request was
// sent (and this lb_calld is still in use), send the load report now.
if (lb_calld->client_load_report_is_due_ &&
lb_calld == lb_calld->xdslb_policy()->lb_calld_.get()) {
lb_calld->IsCurrentCallOnChannel()) {
lb_calld->SendClientLoadReportLocked();
lb_calld->client_load_report_is_due_ = false;
}
lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
}
void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
void* arg, grpc_error* error) {
void XdsLb::BalancerChannelState::BalancerCallState::
OnBalancerMessageReceivedLocked(void* arg, grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
XdsLb* xdslb_policy = lb_calld->xdslb_policy();
// Empty payload means the LB call was cancelled.
if (lb_calld != xdslb_policy->lb_calld_.get() ||
if (!lb_calld->IsCurrentCallOnChannel() ||
lb_calld->recv_message_payload_ == nullptr) {
lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
return;
@ -709,20 +830,25 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
nullptr) {
// Have NOT seen initial response, look for initial response.
if (initial_response->has_client_stats_report_interval) {
lb_calld->client_stats_report_interval_ = GPR_MAX(
GPR_MS_PER_SEC, xds_grpclb_duration_to_millis(
&initial_response->client_stats_report_interval));
if (grpc_lb_xds_trace.enabled()) {
const grpc_millis interval = xds_grpclb_duration_to_millis(
&initial_response->client_stats_report_interval);
if (interval > 0) {
lb_calld->client_stats_report_interval_ =
GPR_MAX(GPR_MS_PER_SEC, interval);
}
}
if (grpc_lb_xds_trace.enabled()) {
if (lb_calld->client_stats_report_interval_ != 0) {
gpr_log(GPR_INFO,
"[xdslb %p] Received initial LB response message; "
"client load reporting interval = %" PRId64 " milliseconds",
xdslb_policy, lb_calld->client_stats_report_interval_);
} else {
gpr_log(GPR_INFO,
"[xdslb %p] Received initial LB response message; client load "
"reporting NOT enabled",
xdslb_policy);
}
} else if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Received initial LB response message; client load "
"reporting NOT enabled",
xdslb_policy);
}
xds_grpclb_initial_response_destroy(initial_response);
lb_calld->seen_initial_response_ = true;
@ -745,7 +871,23 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
}
}
/* update serverlist */
// TODO(juanlishen): Don't ingore empty serverlist.
if (serverlist->num_servers > 0) {
// Pending LB channel receives a serverlist; promote it.
// Note that this call can't be on a discarded pending channel, because
// such channels don't have any current call but we have checked this call
// is a current call.
if (!lb_calld->lb_chand_->IsCurrentChannel()) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Promoting pending LB channel %p to replace "
"current LB channel %p",
xdslb_policy, lb_calld->lb_chand_.get(),
lb_calld->xdslb_policy()->lb_chand_.get());
}
lb_calld->xdslb_policy()->lb_chand_ =
std::move(lb_calld->xdslb_policy()->pending_lb_chand_);
}
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
if (lb_calld->client_stats_report_interval_ > 0 &&
@ -818,37 +960,53 @@ void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
}
}
void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked(
void* arg, grpc_error* error) {
void XdsLb::BalancerChannelState::BalancerCallState::
OnBalancerStatusReceivedLocked(void* arg, grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
XdsLb* xdslb_policy = lb_calld->xdslb_policy();
BalancerChannelState* lb_chand = lb_calld->lb_chand_.get();
GPR_ASSERT(lb_calld->lb_call_ != nullptr);
if (grpc_lb_xds_trace.enabled()) {
char* status_details =
grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
gpr_log(GPR_INFO,
"[xdslb %p] Status from LB server received. Status = %d, details "
"= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
xdslb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
lb_calld->lb_call_, grpc_error_string(error));
"= '%s', (lb_chand: %p, lb_calld: %p, lb_call: %p), error '%s'",
xdslb_policy, lb_calld->lb_call_status_, status_details, lb_chand,
lb_calld, lb_calld->lb_call_, grpc_error_string(error));
gpr_free(status_details);
}
// If this lb_calld is still in use, this call ended because of a failure so
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
if (lb_calld == xdslb_policy->lb_calld_.get()) {
xdslb_policy->lb_calld_.reset();
// Ignore status from a stale call.
if (lb_calld->IsCurrentCallOnChannel()) {
// Because this call is the current one on the channel, the channel can't
// have been swapped out; otherwise, the call should have been reset.
GPR_ASSERT(lb_chand->IsCurrentChannel() || lb_chand->IsPendingChannel());
GPR_ASSERT(!xdslb_policy->shutting_down_);
xdslb_policy->channel_control_helper()->RequestReresolution();
if (lb_calld->seen_initial_response_) {
// If we lose connection to the LB server, reset the backoff and restart
// the LB call immediately.
xdslb_policy->lb_call_backoff_.Reset();
xdslb_policy->StartBalancerCallLocked();
if (lb_chand != xdslb_policy->LatestLbChannel()) {
// This channel must be the current one and there is a pending one. Swap
// in the pending one and we are done.
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Promoting pending LB channel %p to replace "
"current LB channel %p",
xdslb_policy, lb_calld->lb_chand_.get(),
lb_calld->xdslb_policy()->lb_chand_.get());
}
xdslb_policy->lb_chand_ = std::move(xdslb_policy->pending_lb_chand_);
} else {
// If this LB call fails establishing any connection to the LB server,
// retry later.
xdslb_policy->StartBalancerCallRetryTimerLocked();
// This channel is the most recently created one. Try to restart the call
// and reresolve.
lb_chand->lb_calld_.reset();
if (lb_calld->seen_initial_response_) {
// If we lost connection to the LB server, reset the backoff and restart
// the LB call immediately.
lb_chand->lb_call_backoff_.Reset();
lb_chand->StartCallLocked();
} else {
// If we failed to connect to the LB server, retry later.
lb_chand->StartCallRetryTimerLocked();
}
xdslb_policy->channel_control_helper()->RequestReresolution();
}
}
lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
@ -858,53 +1016,23 @@ void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// helper code for creating balancer channel
//
UniquePtr<ServerAddressList> ExtractBalancerAddresses(
const ServerAddressList& addresses) {
auto balancer_addresses = MakeUnique<ServerAddressList>();
for (size_t i = 0; i < addresses.size(); ++i) {
if (addresses[i].IsBalancer()) {
balancer_addresses->emplace_back(addresses[i]);
}
}
return balancer_addresses;
}
/* Returns the channel args for the LB channel, used to create a bidirectional
* stream for the reception of load balancing updates.
*
* Inputs:
* - \a addresses: corresponding to the balancers.
* - \a response_generator: in order to propagate updates from the resolver
* above the grpclb policy.
* - \a args: other args inherited from the xds policy. */
grpc_channel_args* BuildBalancerChannelArgs(
const ServerAddressList& addresses,
FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
UniquePtr<ServerAddressList> balancer_addresses =
ExtractBalancerAddresses(addresses);
// Channel args to remove.
// Returns the channel args for the LB channel, used to create a bidirectional
// stream for the reception of load balancing updates.
grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) {
static const char* args_to_remove[] = {
// LB policy name, since we want to use the default (pick_first) in
// the LB channel.
GRPC_ARG_LB_POLICY_NAME,
// The service config that contains the LB config. We don't want to
// recursively use xds in the LB channel.
GRPC_ARG_SERVICE_CONFIG,
// The channel arg for the server URI, since that will be different for
// the LB channel than for the parent channel. The client channel
// factory will re-add this arg with the right value.
GRPC_ARG_SERVER_URI,
// The resolved addresses, which will be generated by the name resolver
// used in the LB channel. Note that the LB channel will use the fake
// resolver, so this won't actually generate a query to DNS (or some
// other name service). However, the addresses returned by the fake
// resolver will have is_balancer=false, whereas our own addresses have
// is_balancer=true. We need the LB channel to return addresses with
// is_balancer=false so that it does not wind up recursively using the
// xds LB policy, as per the special case logic in client_channel.c.
// used in the LB channel.
GRPC_ARG_SERVER_ADDRESS_LIST,
// The fake resolver response generator, because we are replacing it
// with the one from the xds policy, used to propagate updates to
// the LB channel.
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
// The LB channel should use the authority indicated by the target
// authority table (see \a grpc_lb_policy_xds_modify_lb_channel_args),
// as opposed to the authority from the parent channel.
@ -916,14 +1044,6 @@ grpc_channel_args* BuildBalancerChannelArgs(
};
// Channel args to add.
const grpc_arg args_to_add[] = {
// New server address list.
// Note that we pass these in both when creating the LB channel
// and via the fake resolver. The latter is what actually gets used.
CreateServerAddressListChannelArg(balancer_addresses.get()),
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
response_generator),
// A channel arg indicating the target is a xds load balancer.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1),
@ -944,21 +1064,8 @@ grpc_channel_args* BuildBalancerChannelArgs(
// ctor and dtor
//
XdsLb::XdsLb(Args args)
: LoadBalancingPolicy(std::move(args)),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_backoff_(
BackOff::Options()
.set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
1000)
.set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_XDS_RECONNECT_JITTER)
.set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
// Initialization.
gpr_mu_init(&lb_channel_mu_);
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&XdsLb::OnBalancerChannelConnectivityChangedLocked, this,
grpc_combiner_scheduler(args.combiner));
XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)) {
gpr_mu_init(&lb_chand_mu_);
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
@ -982,7 +1089,7 @@ XdsLb::XdsLb(Args args)
}
XdsLb::~XdsLb() {
gpr_mu_destroy(&lb_channel_mu_);
gpr_mu_destroy(&lb_chand_mu_);
gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_);
if (serverlist_ != nullptr) {
@ -992,10 +1099,6 @@ XdsLb::~XdsLb() {
void XdsLb::ShutdownLocked() {
shutting_down_ = true;
lb_calld_.reset();
if (retry_timer_callback_pending_) {
grpc_timer_cancel(&lb_call_retry_timer_);
}
if (fallback_timer_callback_pending_) {
grpc_timer_cancel(&lb_fallback_timer_);
}
@ -1004,11 +1107,10 @@ void XdsLb::ShutdownLocked() {
// destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
// alive when that callback is invoked.
if (lb_channel_ != nullptr) {
gpr_mu_lock(&lb_channel_mu_);
grpc_channel_destroy(lb_channel_);
lb_channel_ = nullptr;
gpr_mu_unlock(&lb_channel_mu_);
{
MutexLock lock(&lb_chand_mu_);
lb_chand_.reset();
pending_lb_chand_.reset();
}
}
@ -1017,8 +1119,11 @@ void XdsLb::ShutdownLocked() {
//
void XdsLb::ResetBackoffLocked() {
if (lb_channel_ != nullptr) {
grpc_channel_reset_connect_backoff(lb_channel_);
if (lb_chand_ != nullptr) {
grpc_channel_reset_connect_backoff(lb_chand_->channel());
}
if (pending_lb_chand_ != nullptr) {
grpc_channel_reset_connect_backoff(pending_lb_chand_->channel());
}
if (child_policy_ != nullptr) {
child_policy_->ResetBackoffLocked();
@ -1027,12 +1132,19 @@ void XdsLb::ResetBackoffLocked() {
void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
// delegate to the child_policy_ to fill the children subchannels.
// Delegate to the child_policy_ to fill the children subchannels.
child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
MutexLock lock(&lb_channel_mu_);
if (lb_channel_ != nullptr) {
MutexLock lock(&lb_chand_mu_);
if (lb_chand_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_channel_);
grpc_channel_get_channelz_node(lb_chand_->channel());
if (channel_node != nullptr) {
child_channels->push_back(channel_node->uuid());
}
}
if (pending_lb_chand_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(pending_lb_chand_->channel());
if (channel_node != nullptr) {
child_channels->push_back(channel_node->uuid());
}
@ -1059,22 +1171,29 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
args_ = grpc_channel_args_copy_and_add_and_remove(
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
// Construct args for balancer channel.
grpc_channel_args* lb_channel_args =
BuildBalancerChannelArgs(*addresses, response_generator_.get(), &args);
// Create balancer channel if needed.
if (lb_channel_ == nullptr) {
char* uri_str;
gpr_asprintf(&uri_str, "fake:///%s", server_name_);
gpr_mu_lock(&lb_channel_mu_);
lb_channel_ =
channel_control_helper()->CreateChannel(uri_str, *lb_channel_args);
gpr_mu_unlock(&lb_channel_mu_);
GPR_ASSERT(lb_channel_ != nullptr);
gpr_free(uri_str);
grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(&args);
// Create an LB channel if we don't have one yet or the balancer name has
// changed from the last received one.
bool create_lb_channel = lb_chand_ == nullptr;
if (lb_chand_ != nullptr) {
UniquePtr<char> last_balancer_name(
grpc_channel_get_target(LatestLbChannel()->channel()));
create_lb_channel =
strcmp(last_balancer_name.get(), balancer_name_.get()) != 0;
}
if (create_lb_channel) {
OrphanablePtr<BalancerChannelState> lb_chand =
MakeOrphanable<BalancerChannelState>(balancer_name_.get(),
*lb_channel_args, Ref());
if (lb_chand_ == nullptr || !lb_chand_->HasActiveCall()) {
GPR_ASSERT(pending_lb_chand_ == nullptr);
// If we do not have a working LB channel yet, use the newly created one.
lb_chand_ = std::move(lb_chand);
} else {
// Otherwise, wait until the new LB channel to be ready to swap it in.
pending_lb_chand_ = std::move(lb_chand);
}
}
// Propagate updates to the LB channel (pick_first) through the fake
// resolver.
response_generator_->SetResponse(lb_channel_args);
grpc_channel_args_destroy(lb_channel_args);
}
@ -1114,12 +1233,13 @@ void XdsLb::ParseLbConfig(Config* xds_config) {
void XdsLb::UpdateLocked(const grpc_channel_args& args,
RefCountedPtr<Config> lb_config) {
const bool is_initial_update = lb_channel_ == nullptr;
const bool is_initial_update = lb_chand_ == nullptr;
ParseLbConfig(lb_config.get());
// TODO(juanlishen): Pass fallback policy config update after fallback policy
// is added.
if (balancer_name_ == nullptr) {
gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this);
return;
}
ProcessChannelArgsLocked(args);
// Update the existing child policy.
@ -1139,24 +1259,6 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args,
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
}
StartBalancerCallLocked();
} else if (!watching_lb_channel_) {
// If this is not the initial update and we're not already watching
// the LB channel's connectivity state, start a watch now. This
// ensures that we'll know when to switch to a new balancer call.
lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
lb_channel_, true /* try to connect */);
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
watching_lb_channel_ = true;
// Ref held by closure.
Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(interested_parties()),
&lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
nullptr);
}
}
@ -1164,20 +1266,6 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args,
// code for balancer channel and call
//
void XdsLb::StartBalancerCallLocked() {
GPR_ASSERT(lb_channel_ != nullptr);
if (shutting_down_) return;
// Init the LB call data.
GPR_ASSERT(lb_calld_ == nullptr);
lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
this, lb_channel_, lb_calld_.get());
}
lb_calld_->StartQuery();
}
void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
xdslb_policy->fallback_timer_callback_pending_ = false;
@ -1194,88 +1282,6 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}
void XdsLb::StartBalancerCallRetryTimerLocked() {
grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Connection to LB server lost...", this);
grpc_millis timeout = next_try - ExecCtx::Get()->Now();
if (timeout > 0) {
gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active in %" PRId64 "ms.",
this, timeout);
} else {
gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active immediately.", this);
}
}
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
self.release();
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &XdsLb::OnBalancerCallRetryTimerLocked,
this, grpc_combiner_scheduler(combiner()));
retry_timer_callback_pending_ = true;
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
}
void XdsLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
xdslb_policy->retry_timer_callback_pending_ = false;
if (!xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
xdslb_policy->lb_calld_ == nullptr) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Restarting call to LB server",
xdslb_policy);
}
xdslb_policy->StartBalancerCallLocked();
}
xdslb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
}
// Invoked as part of the update process. It continues watching the LB channel
// until it shuts down or becomes READY. It's invoked even if the LB channel
// stayed READY throughout the update (for example if the update is identical).
void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
if (xdslb_policy->shutting_down_) goto done;
// Re-initialize the lb_call. This should also take care of updating the
// child policy. Note that the current child policy, if any, will
// stay in effect until an update from the new lb_call is received.
switch (xdslb_policy->lb_channel_connectivity_) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
// Keep watching the LB channel.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(xdslb_policy->lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
xdslb_policy->interested_parties()),
&xdslb_policy->lb_channel_connectivity_,
&xdslb_policy->lb_channel_on_connectivity_changed_, nullptr);
break;
}
// The LB channel may be IDLE because it's shut down before the update.
// Restart the LB call to kick the LB channel into gear.
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_READY:
xdslb_policy->lb_calld_.reset();
if (xdslb_policy->retry_timer_callback_pending_) {
grpc_timer_cancel(&xdslb_policy->lb_call_retry_timer_);
}
xdslb_policy->lb_call_backoff_.Reset();
xdslb_policy->StartBalancerCallLocked();
// Fall through.
case GRPC_CHANNEL_SHUTDOWN:
done:
xdslb_policy->watching_lb_channel_ = false;
xdslb_policy->Unref(DEBUG_LOCATION,
"watch_lb_channel_connectivity_cb_shutdown");
}
}
//
// code for interacting with the child policy
//
@ -1360,18 +1366,6 @@ class XdsFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
/* Count the number of gRPC-LB addresses. There must be at least one. */
const ServerAddressList* addresses =
FindServerAddressListChannelArg(args.args);
if (addresses == nullptr) return nullptr;
bool found_balancer_address = false;
for (size_t i = 0; i < addresses->size(); ++i) {
if ((*addresses)[i].IsBalancer()) {
found_balancer_address = true;
break;
}
}
if (!found_balancer_address) return nullptr;
return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(std::move(args)));
}

@ -33,55 +33,12 @@
#include "src/core/lib/security/transport/target_authority_table.h"
#include "src/core/lib/slice/slice_internal.h"
namespace grpc_core {
namespace {
int BalancerNameCmp(const grpc_core::UniquePtr<char>& a,
const grpc_core::UniquePtr<char>& b) {
return strcmp(a.get(), b.get());
}
RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable(
const ServerAddressList& addresses) {
TargetAuthorityTable::Entry* target_authority_entries =
static_cast<TargetAuthorityTable::Entry*>(
gpr_zalloc(sizeof(*target_authority_entries) * addresses.size()));
for (size_t i = 0; i < addresses.size(); ++i) {
char* addr_str;
GPR_ASSERT(
grpc_sockaddr_to_string(&addr_str, &addresses[i].address(), true) > 0);
target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str);
gpr_free(addr_str);
char* balancer_name = grpc_channel_arg_get_string(grpc_channel_args_find(
addresses[i].args(), GRPC_ARG_ADDRESS_BALANCER_NAME));
target_authority_entries[i].value.reset(gpr_strdup(balancer_name));
}
RefCountedPtr<TargetAuthorityTable> target_authority_table =
TargetAuthorityTable::Create(addresses.size(), target_authority_entries,
BalancerNameCmp);
gpr_free(target_authority_entries);
return target_authority_table;
}
} // namespace
} // namespace grpc_core
grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args(
grpc_channel_args* args) {
const char* args_to_remove[1];
size_t num_args_to_remove = 0;
grpc_arg args_to_add[2];
size_t num_args_to_add = 0;
// Add arg for targets info table.
grpc_core::ServerAddressList* addresses =
grpc_core::FindServerAddressListChannelArg(args);
GPR_ASSERT(addresses != nullptr);
grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable>
target_authority_table =
grpc_core::CreateTargetAuthorityTable(*addresses);
args_to_add[num_args_to_add++] =
grpc_core::CreateTargetAuthorityTableChannelArg(
target_authority_table.get());
// Substitute the channel credentials with a version without call
// credentials: the load balancer is not necessarily trusted to handle
// bearer token credentials.

@ -86,7 +86,14 @@ FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) {
channel_args_ = grpc_channel_args_copy(args.args);
FakeResolverResponseGenerator* response_generator =
FakeResolverResponseGenerator::GetFromArgs(args.args);
if (response_generator != nullptr) response_generator->resolver_ = this;
if (response_generator != nullptr) {
response_generator->resolver_ = this;
if (response_generator->response_ != nullptr) {
response_generator->SetResponse(response_generator->response_);
grpc_channel_args_destroy(response_generator->response_);
response_generator->response_ = nullptr;
}
}
}
FakeResolver::~FakeResolver() {
@ -114,6 +121,9 @@ void FakeResolver::RequestReresolutionLocked() {
void FakeResolver::MaybeFinishNextLocked() {
if (next_completion_ != nullptr &&
(next_results_ != nullptr || return_failure_)) {
// When both next_results_ and channel_args_ contain an arg with the same
// name, only the one in next_results_ will be kept since next_results_ is
// before channel_args_.
*target_result_ =
return_failure_ ? nullptr
: grpc_channel_args_union(next_results_, channel_args_);
@ -157,15 +167,19 @@ void FakeResolverResponseGenerator::SetResponseLocked(void* arg,
void FakeResolverResponseGenerator::SetResponse(grpc_channel_args* response) {
GPR_ASSERT(response != nullptr);
GPR_ASSERT(resolver_ != nullptr);
SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
closure_arg->generator = this;
closure_arg->response = grpc_channel_args_copy(response);
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetResponseLocked,
closure_arg,
grpc_combiner_scheduler(resolver_->combiner())),
GRPC_ERROR_NONE);
if (resolver_ != nullptr) {
SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
closure_arg->generator = this;
closure_arg->response = grpc_channel_args_copy(response);
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetResponseLocked,
closure_arg,
grpc_combiner_scheduler(resolver_->combiner())),
GRPC_ERROR_NONE);
} else {
GPR_ASSERT(response_ == nullptr);
response_ = grpc_channel_args_copy(response);
}
}
void FakeResolverResponseGenerator::SetReresolutionResponseLocked(

@ -44,7 +44,9 @@ class FakeResolverResponseGenerator
FakeResolverResponseGenerator() {}
// Instructs the fake resolver associated with the response generator
// instance to trigger a new resolution with the specified response.
// instance to trigger a new resolution with the specified response. If the
// resolver is not available yet, delays response setting until it is. This
// can be called at most once before the resolver is available.
void SetResponse(grpc_channel_args* next_response);
// Sets the re-resolution response, which is returned by the fake resolver
@ -79,6 +81,7 @@ class FakeResolverResponseGenerator
static void SetFailureLocked(void* arg, grpc_error* error);
FakeResolver* resolver_ = nullptr; // Do not own.
grpc_channel_args* response_ = nullptr;
};
} // namespace grpc_core

@ -26,6 +26,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/transport/chttp2/alpn/alpn.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
@ -53,8 +55,11 @@ class grpc_fake_channel_security_connector final
target_(gpr_strdup(target)),
expected_targets_(
gpr_strdup(grpc_fake_transport_get_expected_targets(args))),
is_lb_channel_(grpc_core::FindTargetAuthorityTableInArgs(args) !=
nullptr) {
is_lb_channel_(
grpc_channel_args_find(
args, GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER) != nullptr ||
grpc_channel_args_find(
args, GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER) != nullptr) {
const grpc_arg* target_name_override_arg =
grpc_channel_args_find(args, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG);
if (target_name_override_arg != nullptr) {

@ -439,6 +439,28 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "xds_end2end_test",
srcs = ["xds_end2end_test.cc"],
external_deps = [
"gmock",
"gtest",
],
deps = [
":test_service_impl",
"//:gpr",
"//:grpc",
"//:grpc++",
"//:grpc_resolver_fake",
"//src/proto/grpc/lb/v1:load_balancer_proto",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)
grpc_cc_test(
name = "proto_server_reflection_test",
srcs = ["proto_server_reflection_test.cc"],

File diff suppressed because it is too large Load Diff

@ -5002,6 +5002,28 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util"
],
"headers": [
"src/proto/grpc/lb/v1/load_balancer.grpc.pb.h",
"src/proto/grpc/lb/v1/load_balancer.pb.h",
"src/proto/grpc/lb/v1/load_balancer_mock.grpc.pb.h"
],
"is_filegroup": false,
"language": "c++",
"name": "xds_end2end_test",
"src": [
"test/cpp/end2end/xds_end2end_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",

@ -5686,6 +5686,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "xds_end2end_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save