Move XdsClient instantiation into xds resolver.

pull/20476/head
Mark D. Roth 5 years ago
parent dd89123f0d
commit 1564207245
  1. 17
      BUILD
  2. 14
      CMakeLists.txt
  3. 14
      Makefile
  4. 12
      build.yaml
  5. 14
      grpc.gyp
  6. 83
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  7. 71
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  8. 379
      src/core/ext/filters/client_channel/xds/xds_client.cc
  9. 63
      src/core/ext/filters/client_channel/xds/xds_client.h
  10. 207
      test/cpp/end2end/xds_end2end_test.cc

17
BUILD

@ -320,6 +320,7 @@ grpc_cc_library(
"grpc_common",
"grpc_lb_policy_grpclb",
"grpc_lb_policy_xds",
"grpc_resolver_xds",
],
)
@ -336,6 +337,7 @@ grpc_cc_library(
"grpc_common",
"grpc_lb_policy_grpclb_secure",
"grpc_lb_policy_xds_secure",
"grpc_resolver_xds_secure",
"grpc_secure",
"grpc_transport_chttp2_client_secure",
"grpc_transport_chttp2_server_secure",
@ -994,7 +996,6 @@ grpc_cc_library(
"grpc_resolver_fake",
"grpc_resolver_dns_native",
"grpc_resolver_sockaddr",
"grpc_resolver_xds",
"grpc_transport_chttp2_client_insecure",
"grpc_transport_chttp2_server_insecure",
"grpc_transport_inproc",
@ -1581,6 +1582,20 @@ grpc_cc_library(
deps = [
"grpc_base",
"grpc_client_channel",
"grpc_xds_client",
],
)
grpc_cc_library(
name = "grpc_resolver_xds_secure",
srcs = [
"src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc",
],
language = "c++",
deps = [
"grpc_base",
"grpc_client_channel",
"grpc_xds_client_secure",
],
)

@ -2821,13 +2821,6 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c
src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
src/core/ext/filters/client_channel/xds/xds_api.cc
src/core/ext/filters/client_channel/xds/xds_bootstrap.cc
src/core/ext/filters/client_channel/xds/xds_channel.cc
@ -2853,6 +2846,13 @@ add_library(grpc_unsecure
src/core/ext/upb-generated/envoy/api/v2/core/protocol.upb.c
src/core/ext/upb-generated/envoy/type/percent.upb.c
src/core/ext/upb-generated/envoy/type/range.upb.c
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c
src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
src/core/ext/filters/census/grpc_context.cc

@ -5320,13 +5320,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \
src/core/ext/filters/client_channel/lb_policy/xds/xds.cc \
src/core/ext/filters/client_channel/xds/xds_api.cc \
src/core/ext/filters/client_channel/xds/xds_bootstrap.cc \
src/core/ext/filters/client_channel/xds/xds_channel.cc \
@ -5352,6 +5345,13 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/upb-generated/envoy/api/v2/core/protocol.upb.c \
src/core/ext/upb-generated/envoy/type/percent.upb.c \
src/core/ext/upb-generated/envoy/type/range.upb.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \
src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \
src/core/ext/filters/client_channel/lb_policy/xds/xds.cc \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/census/grpc_context.cc \

@ -1230,6 +1230,16 @@ filegroups:
uses:
- grpc_base
- grpc_client_channel
- grpc_xds_client
- name: grpc_resolver_xds_secure
src:
- src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
plugin: grpc_resolver_xds
uses:
- grpc_base
- grpc_client_channel
- grpc_secure
- grpc_xds_client_secure
- name: grpc_secure
public_headers:
- include/grpc/grpc_security.h
@ -1678,7 +1688,7 @@ libs:
- grpc_resolver_dns_native
- grpc_resolver_sockaddr
- grpc_resolver_fake
- grpc_resolver_xds
- grpc_resolver_xds_secure
- grpc_secure
- census
- grpc_client_idle_filter

@ -1416,13 +1416,6 @@
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc',
'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c',
'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc',
'src/core/ext/filters/client_channel/xds/xds_api.cc',
'src/core/ext/filters/client_channel/xds/xds_bootstrap.cc',
'src/core/ext/filters/client_channel/xds/xds_channel.cc',
@ -1448,6 +1441,13 @@
'src/core/ext/upb-generated/envoy/api/v2/core/protocol.upb.c',
'src/core/ext/upb-generated/envoy/type/percent.upb.c',
'src/core/ext/upb-generated/envoy/type/range.upb.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc',
'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c',
'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/census/grpc_context.cc',

@ -373,6 +373,11 @@ class XdsLb : public LoadBalancingPolicy {
const char* name, const grpc_channel_args* args);
void MaybeExitFallbackMode();
XdsClient* xds_client() const {
return xds_client_from_channel_ != nullptr ? xds_client_from_channel_.get()
: xds_client_.get();
}
// Name of the backend server to connect to.
const char* server_name_ = nullptr;
@ -382,7 +387,11 @@ class XdsLb : public LoadBalancingPolicy {
// Internal state.
bool shutting_down_ = false;
// The xds client.
// The xds client and endpoint watcher.
// If we get the XdsClient from the channel, we store it in
// xds_client_from_channel_; if we create it ourselves, we store it in
// xds_client_.
RefCountedPtr<XdsClient> xds_client_from_channel_;
OrphanablePtr<XdsClient> xds_client_;
// A pointer to the endpoint watcher, to be used when cancelling the watch.
// Note that this is not owned, so this pointer must never be derefernced.
@ -580,6 +589,10 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface {
: xds_policy_(std::move(xds_policy)) {}
void OnEndpointChanged(EdsUpdate update) override {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Received EDS update from xds client",
xds_policy_.get());
}
// If the balancer tells us to drop all the calls, we should exit fallback
// mode immediately.
if (update.drop_all) xds_policy_->MaybeExitFallbackMode();
@ -647,6 +660,7 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface {
XdsLb::XdsLb(Args args)
: LoadBalancingPolicy(std::move(args)),
xds_client_from_channel_(XdsClient::GetFromChannelArgs(*args.args)),
lb_fallback_timeout_ms_(grpc_channel_args_find_integer(
args.args, GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS,
{GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX})),
@ -657,6 +671,11 @@ XdsLb::XdsLb(Args args)
args.args, GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS,
{GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS, 0, INT_MAX})),
priority_list_(this) {
if (xds_client_from_channel_ != nullptr &&
GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Using xds client %p from channel", this,
xds_client_from_channel_.get());
}
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
@ -699,12 +718,11 @@ void XdsLb::ShutdownLocked() {
pending_fallback_policy_.reset();
// Cancel the endpoint watch here instead of in our dtor, because the
// watcher holds a ref to us.
if (xds_client_ != nullptr) {
xds_client_->CancelEndpointDataWatch(StringView(server_name_),
endpoint_watcher_);
xds_client_->RemoveClientStats(StringView(server_name_), &client_stats_);
xds_client_.reset();
}
xds_client()->CancelEndpointDataWatch(StringView(server_name_),
endpoint_watcher_);
xds_client()->RemoveClientStats(StringView(server_name_), &client_stats_);
xds_client_from_channel_.reset();
xds_client_.reset();
}
//
@ -712,9 +730,9 @@ void XdsLb::ShutdownLocked() {
//
void XdsLb::ResetBackoffLocked() {
// TODO(roth): When we instantiate the XdsClient in the resolver
// instead of in this LB policy, this should be done in the resolver
// instead of here.
// When the XdsClient is instantiated in the resolver instead of in this
// LB policy, this is done via the resolver, so we don't need to do it
// for xds_client_from_channel_ here.
if (xds_client_ != nullptr) xds_client_->ResetBackoff();
priority_list_.ResetBackoffLocked();
if (fallback_policy_ != nullptr) {
@ -726,7 +744,10 @@ void XdsLb::ResetBackoffLocked() {
}
void XdsLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = xds_client_ == nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Received update", this);
}
const bool is_initial_update = args_ == nullptr;
// Update config.
auto* xds_config = static_cast<const ParsedXdsConfig*>(args.config.get());
child_policy_config_ = xds_config->child_policy();
@ -737,32 +758,32 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
grpc_channel_args_destroy(args_);
args_ = args.args;
args.args = nullptr;
// Create an xds client if we don't have one yet.
if (xds_client_ == nullptr) {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(
combiner(), interested_parties(), StringView(server_name_),
nullptr /* service config watcher */, *args_, &error);
// TODO(roth): When we move instantiation of the XdsClient into the
// xds resolver, add proper error handling there.
GPR_ASSERT(error == GRPC_ERROR_NONE);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Created xds client %p", this,
xds_client_.get());
}
endpoint_watcher_ = New<EndpointWatcher>(Ref());
xds_client_->WatchEndpointData(
StringView(server_name_),
UniquePtr<XdsClient::EndpointWatcherInterface>(endpoint_watcher_));
xds_client_->AddClientStats(StringView(server_name_), &client_stats_);
}
// Update priority list.
priority_list_.UpdateLocked();
// Update the existing fallback policy. The fallback policy config and/or the
// fallback addresses may be new.
if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked();
// If this is the initial update, start the fallback-at-startup checks.
if (is_initial_update) {
// Initialize XdsClient.
if (xds_client_from_channel_ == nullptr) {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(
combiner(), interested_parties(), StringView(server_name_),
nullptr /* service config watcher */, *args_, &error);
// TODO(roth): If we decide that we care about fallback mode, add
// proper error handling here.
GPR_ASSERT(error == GRPC_ERROR_NONE);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Created xds client %p", this,
xds_client_.get());
}
}
auto watcher = MakeUnique<EndpointWatcher>(Ref());
endpoint_watcher_ = watcher.get();
xds_client()->WatchEndpointData(StringView(server_name_),
std::move(watcher));
xds_client()->AddClientStats(StringView(server_name_), &client_stats_);
// Start fallback-at-startup checks.
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,

@ -19,39 +19,84 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/xds/xds_client.h"
#include "src/core/lib/gprpp/string_view.h"
namespace grpc_core {
namespace {
//
// XdsResolver
//
class XdsResolver : public Resolver {
public:
explicit XdsResolver(ResolverArgs args)
: Resolver(args.combiner, std::move(args.result_handler)),
args_(grpc_channel_args_copy(args.args)) {}
args_(grpc_channel_args_copy(args.args)),
interested_parties_(args.pollset_set) {
char* path = args.uri->path;
if (path[0] == '/') ++path;
server_name_.reset(gpr_strdup(path));
}
~XdsResolver() override { grpc_channel_args_destroy(args_); }
void StartLocked() override;
void ShutdownLocked() override{};
void ShutdownLocked() override { xds_client_.reset(); }
private:
class ServiceConfigWatcher : public XdsClient::ServiceConfigWatcherInterface {
public:
explicit ServiceConfigWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnServiceConfigChanged(
RefCountedPtr<ServiceConfig> service_config) override;
void OnError(grpc_error* error) override;
private:
RefCountedPtr<XdsResolver> resolver_;
};
UniquePtr<char> server_name_;
const grpc_channel_args* args_;
grpc_pollset_set* interested_parties_;
OrphanablePtr<XdsClient> xds_client_;
};
void XdsResolver::StartLocked() {
static const char* service_config =
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"xds_experimental\":{} }\n"
" ]\n"
"}";
void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged(
RefCountedPtr<ServiceConfig> service_config) {
grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg();
Result result;
result.args = args_;
args_ = nullptr;
result.args =
grpc_channel_args_copy_and_add(resolver_->args_, &xds_client_arg, 1);
result.service_config = std::move(service_config);
resolver_->result_handler()->ReturnResult(std::move(result));
}
void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) {
grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg();
Result result;
result.args =
grpc_channel_args_copy_and_add(resolver_->args_, &xds_client_arg, 1);
result.service_config_error = error;
resolver_->result_handler()->ReturnResult(std::move(result));
}
void XdsResolver::StartLocked() {
grpc_error* error = GRPC_ERROR_NONE;
result.service_config = ServiceConfig::Create(service_config, &error);
result_handler()->ReturnResult(std::move(result));
xds_client_ = MakeOrphanable<XdsClient>(
combiner(), interested_parties_, StringView(server_name_.get()),
MakeUnique<ServiceConfigWatcher>(Ref()), *args_, &error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"Failed to create xds client -- channel will remain in "
"TRANSIENT_FAILURE: %s",
grpc_error_string(error));
result_handler()->ReturnError(error);
}
}
//

@ -68,222 +68,181 @@ namespace grpc_core {
TraceFlag grpc_xds_client_trace(false, "xds_client");
// Contains a channel to the xds server and all the data related to the
// channel. Holds a ref to the xds client object.
// TODO(roth): This is separate from the XdsClient object because it was
// originally designed to be able to swap itself out in case the
// balancer name changed. Now that the balancer name is going to be
// coming from the bootstrap file, we don't really need this level of
// indirection unless we decide to support watching the bootstrap file
// for changes. At some point, if we decide that we're never going to
// need to do that, then we can eliminate this class and move its
// contents directly into the XdsClient class.
class XdsClient::ChannelState : public InternallyRefCounted<ChannelState> {
//
// Internal class declarations
//
// An xds call wrapper that can restart a call upon failure. Holds a ref to
// the xds channel. The template parameter is the kind of wrapped xds call.
template <typename T>
class XdsClient::ChannelState::RetryableCall
: public InternallyRefCounted<RetryableCall<T>> {
public:
// An xds call wrapper that can restart a call upon failure. Holds a ref to
// the xds channel. The template parameter is the kind of wrapped xds call.
template <typename T>
class RetryableCall : public InternallyRefCounted<RetryableCall<T>> {
public:
explicit RetryableCall(RefCountedPtr<ChannelState> chand);
explicit RetryableCall(RefCountedPtr<ChannelState> chand);
void Orphan() override;
void Orphan() override;
void OnCallFinishedLocked();
void OnCallFinishedLocked();
T* calld() const { return calld_.get(); }
ChannelState* chand() const { return chand_.get(); }
T* calld() const { return calld_.get(); }
ChannelState* chand() const { return chand_.get(); }
private:
void StartNewCallLocked();
void StartRetryTimerLocked();
static void OnRetryTimerLocked(void* arg, grpc_error* error);
// The wrapped call that talks to the xds server. It's instantiated
// every time we start a new call. It's null during call retry backoff.
OrphanablePtr<T> calld_;
// The owning xds channel.
RefCountedPtr<ChannelState> chand_;
// Retry state.
BackOff backoff_;
grpc_timer retry_timer_;
grpc_closure on_retry_timer_;
bool retry_timer_callback_pending_ = false;
bool shutting_down_ = false;
};
bool IsCurrentCallOnChannel() const;
// Contains an ADS call to the xds server.
class AdsCallState : public InternallyRefCounted<AdsCallState> {
public:
// The ctor and dtor should not be used directly.
explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
~AdsCallState() override;
private:
void StartNewCallLocked();
void StartRetryTimerLocked();
static void OnRetryTimerLocked(void* arg, grpc_error* error);
// The wrapped xds call that talks to the xds server. It's instantiated
// every time we start a new call. It's null during call retry backoff.
OrphanablePtr<T> calld_;
// The owning xds channel.
RefCountedPtr<ChannelState> chand_;
// Retry state.
BackOff backoff_;
grpc_timer retry_timer_;
grpc_closure on_retry_timer_;
bool retry_timer_callback_pending_ = false;
void Orphan() override;
bool shutting_down_ = false;
};
RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
ChannelState* chand() const { return parent_->chand(); }
XdsClient* xds_client() const { return chand()->xds_client(); }
bool seen_response() const { return seen_response_; }
// Contains an ADS call to the xds server.
class XdsClient::ChannelState::AdsCallState
: public InternallyRefCounted<AdsCallState> {
public:
// The ctor and dtor should not be used directly.
explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
~AdsCallState() override;
private:
static void OnResponseReceivedLocked(void* arg, grpc_error* error);
static void OnStatusReceivedLocked(void* arg, grpc_error* error);
void Orphan() override;
bool IsCurrentCallOnChannel() const;
RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
ChannelState* chand() const { return parent_->chand(); }
XdsClient* xds_client() const { return chand()->xds_client(); }
bool seen_response() const { return seen_response_; }
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<AdsCallState>> parent_;
bool seen_response_ = false;
private:
static void OnResponseReceivedLocked(void* arg, grpc_error* error);
static void OnStatusReceivedLocked(void* arg, grpc_error* error);
// Always non-NULL.
grpc_call* call_;
bool IsCurrentCallOnChannel() const;
// recv_initial_metadata
grpc_metadata_array initial_metadata_recv_;
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<AdsCallState>> parent_;
bool seen_response_ = false;
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
// Always non-NULL.
grpc_call* call_;
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure on_response_received_;
// recv_initial_metadata
grpc_metadata_array initial_metadata_recv_;
// recv_trailing_metadata
grpc_metadata_array trailing_metadata_recv_;
grpc_status_code status_code_;
grpc_slice status_details_;
grpc_closure on_status_received_;
};
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
// Contains an LRS call to the xds server.
class LrsCallState : public InternallyRefCounted<LrsCallState> {
public:
// The ctor and dtor should not be used directly.
explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
~LrsCallState() override;
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure on_response_received_;
void Orphan() override;
// recv_trailing_metadata
grpc_metadata_array trailing_metadata_recv_;
grpc_status_code status_code_;
grpc_slice status_details_;
grpc_closure on_status_received_;
};
void MaybeStartReportingLocked();
// Contains an LRS call to the xds server.
class XdsClient::ChannelState::LrsCallState
: public InternallyRefCounted<LrsCallState> {
public:
// The ctor and dtor should not be used directly.
explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
~LrsCallState() override;
RetryableCall<LrsCallState>* parent() { return parent_.get(); }
ChannelState* chand() const { return parent_->chand(); }
XdsClient* xds_client() const { return chand()->xds_client(); }
bool seen_response() const { return seen_response_; }
void Orphan() override;
private:
// Reports client-side load stats according to a fixed interval.
class Reporter : public InternallyRefCounted<Reporter> {
public:
Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
: parent_(std::move(parent)), report_interval_(report_interval) {
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimerLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDoneLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
ScheduleNextReportLocked();
}
void MaybeStartReportingLocked();
void Orphan() override;
RetryableCall<LrsCallState>* parent() { return parent_.get(); }
ChannelState* chand() const { return parent_->chand(); }
XdsClient* xds_client() const { return chand()->xds_client(); }
bool seen_response() const { return seen_response_; }
private:
void ScheduleNextReportLocked();
static void OnNextReportTimerLocked(void* arg, grpc_error* error);
void SendReportLocked();
static void OnReportDoneLocked(void* arg, grpc_error* error);
private:
// Reports client-side load stats according to a fixed interval.
class Reporter : public InternallyRefCounted<Reporter> {
public:
Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
: parent_(std::move(parent)), report_interval_(report_interval) {
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimerLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDoneLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
ScheduleNextReportLocked();
}
bool IsCurrentReporterOnCall() const {
return this == parent_->reporter_.get();
}
XdsClient* xds_client() const { return parent_->xds_client(); }
// The owning LRS call.
RefCountedPtr<LrsCallState> parent_;
// The load reporting state.
const grpc_millis report_interval_;
bool last_report_counters_were_zero_ = false;
bool next_report_timer_callback_pending_ = false;
grpc_timer next_report_timer_;
grpc_closure on_next_report_timer_;
grpc_closure on_report_done_;
};
static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
static void OnResponseReceivedLocked(void* arg, grpc_error* error);
static void OnStatusReceivedLocked(void* arg, grpc_error* error);
bool IsCurrentCallOnChannel() const;
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<LrsCallState>> parent_;
bool seen_response_ = false;
// Always non-NULL.
grpc_call* call_;
// recv_initial_metadata
grpc_metadata_array initial_metadata_recv_;
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
grpc_closure on_initial_request_sent_;
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure on_response_received_;
// recv_trailing_metadata
grpc_metadata_array trailing_metadata_recv_;
grpc_status_code status_code_;
grpc_slice status_details_;
grpc_closure on_status_received_;
// Load reporting state.
UniquePtr<char> cluster_name_;
grpc_millis load_reporting_interval_ = 0;
OrphanablePtr<Reporter> reporter_;
};
void Orphan() override;
ChannelState(RefCountedPtr<XdsClient> xds_client,
const grpc_channel_args& args);
~ChannelState();
private:
void ScheduleNextReportLocked();
static void OnNextReportTimerLocked(void* arg, grpc_error* error);
void SendReportLocked();
static void OnReportDoneLocked(void* arg, grpc_error* error);
void Orphan() override;
bool IsCurrentReporterOnCall() const {
return this == parent_->reporter_.get();
}
XdsClient* xds_client() const { return parent_->xds_client(); }
// The owning LRS call.
RefCountedPtr<LrsCallState> parent_;
// The load reporting state.
const grpc_millis report_interval_;
bool last_report_counters_were_zero_ = false;
bool next_report_timer_callback_pending_ = false;
grpc_timer next_report_timer_;
grpc_closure on_next_report_timer_;
grpc_closure on_report_done_;
};
grpc_channel* channel() const { return channel_; }
XdsClient* xds_client() const { return xds_client_.get(); }
AdsCallState* ads_calld() const { return ads_calld_->calld(); }
LrsCallState* lrs_calld() const { return lrs_calld_->calld(); }
static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
static void OnResponseReceivedLocked(void* arg, grpc_error* error);
static void OnStatusReceivedLocked(void* arg, grpc_error* error);
void MaybeStartAdsCall();
void StopAdsCall();
bool IsCurrentCallOnChannel() const;
void MaybeStartLrsCall();
void StopLrsCall();
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<LrsCallState>> parent_;
bool seen_response_ = false;
bool HasActiveAdsCall() const { return ads_calld_->calld() != nullptr; }
// Always non-NULL.
grpc_call* call_;
void StartConnectivityWatchLocked();
void CancelConnectivityWatchLocked();
// recv_initial_metadata
grpc_metadata_array initial_metadata_recv_;
private:
class StateWatcher;
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
grpc_closure on_initial_request_sent_;
// The owning xds client.
RefCountedPtr<XdsClient> xds_client_;
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure on_response_received_;
// The channel and its status.
grpc_channel* channel_;
bool shutting_down_ = false;
StateWatcher* watcher_ = nullptr;
// recv_trailing_metadata
grpc_metadata_array trailing_metadata_recv_;
grpc_status_code status_code_;
grpc_slice status_details_;
grpc_closure on_status_received_;
// The retryable XDS calls.
OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
// Load reporting state.
UniquePtr<char> cluster_name_;
grpc_millis load_reporting_interval_ = 0;
OrphanablePtr<Reporter> reporter_;
};
//
@ -400,6 +359,20 @@ void XdsClient::ChannelState::Orphan() {
Unref(DEBUG_LOCATION, "ChannelState+orphaned");
}
XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
const {
return ads_calld_->calld();
}
XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
const {
return lrs_calld_->calld();
}
bool XdsClient::ChannelState::HasActiveAdsCall() const {
return ads_calld_->calld() != nullptr;
}
void XdsClient::ChannelState::MaybeStartAdsCall() {
if (ads_calld_ != nullptr) return;
ads_calld_.reset(New<RetryableCall<AdsCallState>>(
@ -1213,7 +1186,13 @@ XdsClient::XdsClient(grpc_combiner* combiner,
}
chand_ = MakeOrphanable<ChannelState>(
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel_args);
// TODO(roth): Start LDS call.
if (service_config_watcher_ != nullptr) {
// TODO(juanlishen): Start LDS call and do not return service config
// until we get the first LDS response.
GRPC_CLOSURE_INIT(&service_config_notify_, NotifyOnServiceConfig,
Ref().release(), grpc_combiner_scheduler(combiner_));
GRPC_CLOSURE_SCHED(&service_config_notify_, GRPC_ERROR_NONE);
}
}
XdsClient::~XdsClient() { GRPC_COMBINER_UNREF(combiner_, "xds_client"); }
@ -1226,12 +1205,12 @@ void XdsClient::Orphan() {
void XdsClient::WatchClusterData(StringView cluster,
UniquePtr<ClusterWatcherInterface> watcher) {
// TODO(roth): Implement.
// TODO(juanlishen): Implement.
}
void XdsClient::CancelClusterDataWatch(StringView cluster,
ClusterWatcherInterface* watcher) {
// TODO(roth): Implement.
// TODO(juanlishen): Implement.
}
void XdsClient::WatchEndpointData(StringView cluster,
@ -1252,7 +1231,9 @@ void XdsClient::CancelEndpointDataWatch(StringView cluster,
if (it != cluster_state_.endpoint_watchers.end()) {
cluster_state_.endpoint_watchers.erase(it);
}
if (cluster_state_.endpoint_watchers.empty()) chand_->StopAdsCall();
if (chand_ != nullptr && cluster_state_.endpoint_watchers.empty()) {
chand_->StopAdsCall();
}
}
void XdsClient::AddClientStats(StringView cluster,
@ -1270,7 +1251,9 @@ void XdsClient::RemoveClientStats(StringView cluster,
if (it != cluster_state_.client_stats.end()) {
cluster_state_.client_stats.erase(it);
}
if (cluster_state_.client_stats.empty()) chand_->StopLrsCall();
if (chand_ != nullptr && cluster_state_.client_stats.empty()) {
chand_->StopLrsCall();
}
}
void XdsClient::ResetBackoff() {
@ -1280,9 +1263,6 @@ void XdsClient::ResetBackoff() {
}
void XdsClient::NotifyOnError(grpc_error* error) {
// TODO(roth): Once we implement the full LDS flow, it will not be
// necessary to check for the service config watcher being non-null,
// because that will always be true.
if (service_config_watcher_ != nullptr) {
service_config_watcher_->OnError(GRPC_ERROR_REF(error));
}
@ -1295,6 +1275,27 @@ void XdsClient::NotifyOnError(grpc_error* error) {
GRPC_ERROR_UNREF(error);
}
void XdsClient::NotifyOnServiceConfig(void* arg, grpc_error* error) {
XdsClient* self = static_cast<XdsClient*>(arg);
// TODO(roth): When we add support for WeightedClusters, select the
// LB policy based on that functionality.
static const char* json =
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"xds_experimental\":{} }\n"
" ]\n"
"}";
RefCountedPtr<ServiceConfig> service_config =
ServiceConfig::Create(json, &error);
if (error != GRPC_ERROR_NONE) {
self->service_config_watcher_->OnError(error);
} else {
self->service_config_watcher_->OnServiceConfigChanged(
std::move(service_config));
}
self->Unref();
}
void* XdsClient::ChannelArgCopy(void* p) {
XdsClient* xds_client = static_cast<XdsClient*>(p);
xds_client->Ref().release();

@ -112,7 +112,61 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
const grpc_channel_args& args);
private:
class ChannelState;
// Contains a channel to the xds server and all the data related to the
// channel. Holds a ref to the xds client object.
// TODO(roth): This is separate from the XdsClient object because it was
// originally designed to be able to swap itself out in case the
// balancer name changed. Now that the balancer name is going to be
// coming from the bootstrap file, we don't really need this level of
// indirection unless we decide to support watching the bootstrap file
// for changes. At some point, if we decide that we're never going to
// need to do that, then we can eliminate this class and move its
// contents directly into the XdsClient class.
class ChannelState : public InternallyRefCounted<ChannelState> {
public:
template <typename T>
class RetryableCall;
class AdsCallState;
class LrsCallState;
ChannelState(RefCountedPtr<XdsClient> xds_client,
const grpc_channel_args& args);
~ChannelState();
void Orphan() override;
grpc_channel* channel() const { return channel_; }
XdsClient* xds_client() const { return xds_client_.get(); }
AdsCallState* ads_calld() const;
LrsCallState* lrs_calld() const;
void MaybeStartAdsCall();
void StopAdsCall();
void MaybeStartLrsCall();
void StopLrsCall();
bool HasActiveAdsCall() const;
void StartConnectivityWatchLocked();
void CancelConnectivityWatchLocked();
private:
class StateWatcher;
// The owning xds client.
RefCountedPtr<XdsClient> xds_client_;
// The channel and its status.
grpc_channel* channel_;
bool shutting_down_ = false;
StateWatcher* watcher_ = nullptr;
// The retryable XDS calls.
OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
};
struct ClusterState {
Map<ClusterWatcherInterface*, UniquePtr<ClusterWatcherInterface>>
@ -127,6 +181,10 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// Sends an error notification to all watchers.
void NotifyOnError(grpc_error* error);
// TODO(juanlishen): Once we implement LDS support, this can be a
// normal method instead of a closure callback.
static void NotifyOnServiceConfig(void* arg, grpc_error* error);
// Channel arg vtable functions.
static void* ChannelArgCopy(void* p);
static void ChannelArgDestroy(void* p);
@ -143,6 +201,9 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
UniquePtr<char> server_name_;
UniquePtr<ServiceConfigWatcherInterface> service_config_watcher_;
// TODO(juanlishen): Once we implement LDS support, this will no
// longer be needed.
grpc_closure service_config_notify_;
// The channel for communicating with the xds server.
OrphanablePtr<ChannelState> chand_;

@ -293,7 +293,7 @@ class AdsServiceImpl : public AdsService {
Status StreamAggregatedResources(ServerContext* context,
Stream* stream) override {
gpr_log(GPR_INFO, "LB[%p]: ADS StreamAggregatedResources starts", this);
gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this);
[&]() {
{
grpc_core::MutexLock lock(&ads_mu_);
@ -306,7 +306,7 @@ class AdsServiceImpl : public AdsService {
DiscoveryRequest request;
if (!stream->Read(&request)) return;
IncreaseRequestCount();
gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this,
gpr_log(GPR_INFO, "ADS[%p]: received initial message '%s'", this,
request.DebugString().c_str());
// Send response.
std::vector<ResponseDelayPair> responses_and_delays;
@ -322,7 +322,7 @@ class AdsServiceImpl : public AdsService {
grpc_core::MutexLock lock(&ads_mu_);
ads_cond_.WaitUntil(&ads_mu_, [this] { return ads_done_; });
}();
gpr_log(GPR_INFO, "LB[%p]: ADS StreamAggregatedResources done", this);
gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources done", this);
return Status::OK;
}
@ -343,7 +343,7 @@ class AdsServiceImpl : public AdsService {
NotifyDoneWithAdsCallLocked();
responses_and_delays_.clear();
}
gpr_log(GPR_INFO, "LB[%p]: shut down", this);
gpr_log(GPR_INFO, "ADS[%p]: shut down", this);
}
static DiscoveryResponse BuildResponse(const ResponseArgs& args) {
@ -398,11 +398,11 @@ class AdsServiceImpl : public AdsService {
private:
void SendResponse(Stream* stream, const DiscoveryResponse& response,
int delay_ms) {
gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms);
gpr_log(GPR_INFO, "ADS[%p]: sleeping for %d ms...", this, delay_ms);
if (delay_ms > 0) {
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms));
}
gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this,
gpr_log(GPR_INFO, "ADS[%p]: Woke up! Sending response '%s'", this,
response.DebugString().c_str());
IncreaseResponseCount();
stream->Write(response);
@ -424,7 +424,7 @@ class LrsServiceImpl : public LrsService {
client_load_reporting_interval_seconds) {}
Status StreamLoadStats(ServerContext* context, Stream* stream) override {
gpr_log(GPR_INFO, "LB[%p]: LRS StreamLoadStats starts", this);
gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this);
// Read request.
LoadStatsRequest request;
if (stream->Read(&request)) {
@ -442,7 +442,7 @@ class LrsServiceImpl : public LrsService {
// Wait for report.
request.Clear();
if (stream->Read(&request)) {
gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'",
gpr_log(GPR_INFO, "LRS[%p]: received client load report message '%s'",
this, request.DebugString().c_str());
GPR_ASSERT(request.cluster_stats().size() == 1);
const ClusterStats& cluster_stats = request.cluster_stats()[0];
@ -459,7 +459,7 @@ class LrsServiceImpl : public LrsService {
grpc_core::MutexLock lock(&lrs_mu_);
lrs_cv_.WaitUntil(&lrs_mu_, [this] { return lrs_done; });
}
gpr_log(GPR_INFO, "LB[%p]: LRS done", this);
gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this);
return Status::OK;
}
@ -474,7 +474,7 @@ class LrsServiceImpl : public LrsService {
grpc_core::MutexLock lock(&lrs_mu_);
NotifyDoneWithLrsCallLocked();
}
gpr_log(GPR_INFO, "LB[%p]: shut down", this);
gpr_log(GPR_INFO, "LRS[%p]: shut down", this);
}
ClientStats* WaitForLoadReport() {
@ -512,7 +512,7 @@ class LrsServiceImpl : public LrsService {
bool load_report_ready_ = false;
};
class XdsEnd2endTest : public ::testing::Test {
class XdsEnd2endTest : public ::testing::TestWithParam<bool> {
protected:
XdsEnd2endTest(size_t num_backends, size_t num_balancers,
int client_load_reporting_interval_seconds)
@ -573,8 +573,7 @@ class XdsEnd2endTest : public ::testing::Test {
void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
void ResetStub(int fallback_timeout = 0, int failover_timeout = 0,
const grpc::string& expected_targets = "",
grpc::string scheme = "") {
const grpc::string& expected_targets = "") {
ChannelArguments args;
// TODO(juanlishen): Add setter to ChannelArguments.
if (fallback_timeout > 0) {
@ -583,12 +582,21 @@ class XdsEnd2endTest : public ::testing::Test {
if (failover_timeout > 0) {
args.SetInt(GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, failover_timeout);
}
// If the parent channel is using the fake resolver, we inject the
// response generator for the parent here, and then SetNextResolution()
// will inject the xds channel's response generator via the parent's
// reponse generator.
//
// In contrast, if we are using the xds resolver, then the parent
// channel never uses a response generator, and we inject the xds
// channel's response generator here.
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_.get());
GetParam() ? lb_channel_response_generator_.get()
: response_generator_.get());
if (!expected_targets.empty()) {
args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets);
}
if (scheme.empty()) scheme = "fake";
grpc::string scheme = GetParam() ? "xds-experimental" : "fake";
std::ostringstream uri;
uri << scheme << ":///" << kApplicationTargetName_;
// TODO(dgq): templatize tests to run everything using both secure and
@ -633,8 +641,7 @@ class XdsEnd2endTest : public ::testing::Test {
++*num_total;
}
std::tuple<int, int, int> WaitForAllBackends(int num_requests_multiple_of = 1,
size_t start_index = 0,
std::tuple<int, int, int> WaitForAllBackends(size_t start_index = 0,
size_t stop_index = 0) {
int num_ok = 0;
int num_failure = 0;
@ -643,15 +650,11 @@ class XdsEnd2endTest : public ::testing::Test {
while (!SeenAllBackends(start_index, stop_index)) {
SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
}
while (num_total % num_requests_multiple_of != 0) {
SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops);
}
ResetBackendCounters();
gpr_log(GPR_INFO,
"Performed %d warm up requests (a multiple of %d) against the "
"backends. %d succeeded, %d failed, %d dropped.",
num_total, num_requests_multiple_of, num_ok, num_failure,
num_drops);
"Performed %d warm up requests against the backends. "
"%d succeeded, %d failed, %d dropped.",
num_total, num_ok, num_failure, num_drops);
return std::make_tuple(num_ok, num_failure, num_drops);
}
@ -686,6 +689,7 @@ class XdsEnd2endTest : public ::testing::Test {
const char* service_config_json = nullptr,
grpc_core::FakeResolverResponseGenerator*
lb_channel_response_generator = nullptr) {
if (GetParam()) return; // Not used with xds resolver.
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(ports);
@ -919,22 +923,6 @@ class XdsEnd2endTest : public ::testing::Test {
"}";
};
class XdsResolverTest : public XdsEnd2endTest {
public:
XdsResolverTest() : XdsEnd2endTest(0, 0, 0) {}
};
// Tests that if the "xds-experimental" scheme is used, xDS resolver will be
// used.
TEST_F(XdsResolverTest, XdsResolverIsUsed) {
// Use xds-experimental scheme in URI.
ResetStub(0, 0, "", "xds-experimental");
// Send an RPC to trigger resolution.
auto unused_result = SendRpc();
// Xds resolver returns xds_experimental as the LB policy.
EXPECT_EQ("xds_experimental", channel_->GetLoadBalancingPolicyName());
}
class BasicTest : public XdsEnd2endTest {
public:
BasicTest() : XdsEnd2endTest(4, 1, 0) {}
@ -942,7 +930,7 @@ class BasicTest : public XdsEnd2endTest {
// Tests that the balancer sends the correct response to the client, and the
// client sends RPCs to the backends using the default child policy.
TEST_F(BasicTest, Vanilla) {
TEST_P(BasicTest, Vanilla) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcsPerAddress = 100;
@ -970,7 +958,7 @@ TEST_F(BasicTest, Vanilla) {
// Tests that subchannel sharing works when the same backend is listed multiple
// times.
TEST_F(BasicTest, SameBackendListedMultipleTimes) {
TEST_P(BasicTest, SameBackendListedMultipleTimes) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
// Same backend listed twice.
@ -993,7 +981,7 @@ TEST_F(BasicTest, SameBackendListedMultipleTimes) {
}
// Tests that RPCs will be blocked until a non-empty serverlist is received.
TEST_F(BasicTest, InitiallyEmptyServerlist) {
TEST_P(BasicTest, InitiallyEmptyServerlist) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
@ -1029,7 +1017,7 @@ TEST_F(BasicTest, InitiallyEmptyServerlist) {
// Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if
// all the servers are unreachable.
TEST_F(BasicTest, AllServersUnreachableFailFast) {
TEST_P(BasicTest, AllServersUnreachableFailFast) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumUnreachableServers = 5;
@ -1051,7 +1039,7 @@ TEST_F(BasicTest, AllServersUnreachableFailFast) {
// Tests that RPCs fail when the backends are down, and will succeed again after
// the backends are restarted.
TEST_F(BasicTest, BackendsRestart) {
TEST_P(BasicTest, BackendsRestart) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
@ -1071,7 +1059,7 @@ TEST_F(BasicTest, BackendsRestart) {
using SecureNamingTest = BasicTest;
// Tests that secure naming check passes if target name is expected.
TEST_F(SecureNamingTest, TargetNameIsExpected) {
TEST_P(SecureNamingTest, TargetNameIsExpected) {
// TODO(juanlishen): Use separate fake creds for the balancer channel.
ResetStub(0, 0, kApplicationTargetName_ + ";lb");
SetNextResolution({}, kDefaultServiceConfig_.c_str());
@ -1098,7 +1086,7 @@ TEST_F(SecureNamingTest, TargetNameIsExpected) {
}
// Tests that secure naming check fails if target name is unexpected.
TEST_F(SecureNamingTest, TargetNameIsUnexpected) {
TEST_P(SecureNamingTest, TargetNameIsUnexpected) {
gpr_setenv("GRPC_XDS_BOOTSTRAP", "test/cpp/end2end/xds_bootstrap_bad.json");
::testing::FLAGS_gtest_death_test_style = "threadsafe";
// Make sure that we blow up (via abort() from the security connector) when
@ -1117,7 +1105,7 @@ using LocalityMapTest = BasicTest;
// Tests that the localities in a locality map are picked according to their
// weights.
TEST_F(LocalityMapTest, WeightedRoundRobin) {
TEST_P(LocalityMapTest, WeightedRoundRobin) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
@ -1135,7 +1123,7 @@ TEST_F(LocalityMapTest, WeightedRoundRobin) {
});
ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0);
// Wait for both backends to be ready.
WaitForAllBackends(1, 0, 2);
WaitForAllBackends(0, 2);
// Send kNumRpcs RPCs.
CheckRpcSendOk(kNumRpcs);
// The locality picking rates should be roughly equal to the expectation.
@ -1161,7 +1149,7 @@ TEST_F(LocalityMapTest, WeightedRoundRobin) {
// Tests that the locality map can work properly even when it contains a large
// number of localities.
TEST_F(LocalityMapTest, StressTest) {
TEST_P(LocalityMapTest, StressTest) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumLocalities = 100;
@ -1196,7 +1184,7 @@ TEST_F(LocalityMapTest, StressTest) {
// Tests that the localities in a locality map are picked correctly after update
// (addition, modification, deletion).
TEST_F(LocalityMapTest, UpdateMap) {
TEST_P(LocalityMapTest, UpdateMap) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 1000;
@ -1231,7 +1219,7 @@ TEST_F(LocalityMapTest, UpdateMap) {
});
ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 5000);
// Wait for the first 3 backends to be ready.
WaitForAllBackends(1, 0, 3);
WaitForAllBackends(0, 3);
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
// Send kNumRpcs RPCs.
CheckRpcSendOk(kNumRpcs);
@ -1289,11 +1277,11 @@ TEST_F(LocalityMapTest, UpdateMap) {
class FailoverTest : public BasicTest {
public:
FailoverTest() { ResetStub(0, 100, "", ""); }
FailoverTest() { ResetStub(0, 100, ""); }
};
// Localities with the highest priority are used when multiple priority exist.
TEST_F(FailoverTest, ChooseHighestPriority) {
TEST_P(FailoverTest, ChooseHighestPriority) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
@ -1314,7 +1302,7 @@ TEST_F(FailoverTest, ChooseHighestPriority) {
// If the higher priority localities are not reachable, failover to the highest
// priority among the rest.
TEST_F(FailoverTest, Failover) {
TEST_P(FailoverTest, Failover) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
@ -1338,7 +1326,7 @@ TEST_F(FailoverTest, Failover) {
// If a locality with higher priority than the current one becomes ready,
// switch to it.
TEST_F(FailoverTest, SwitchBackToHigherPriority) {
TEST_P(FailoverTest, SwitchBackToHigherPriority) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 100;
@ -1367,7 +1355,7 @@ TEST_F(FailoverTest, SwitchBackToHigherPriority) {
// The first update only contains unavailable priorities. The second update
// contains available priorities.
TEST_F(FailoverTest, UpdateInitialUnavailable) {
TEST_P(FailoverTest, UpdateInitialUnavailable) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
@ -1402,7 +1390,7 @@ TEST_F(FailoverTest, UpdateInitialUnavailable) {
// Tests that after the localities' priorities are updated, we still choose the
// highest READY priority with the updated localities.
TEST_F(FailoverTest, UpdatePriority) {
TEST_P(FailoverTest, UpdatePriority) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 100;
@ -1435,7 +1423,7 @@ TEST_F(FailoverTest, UpdatePriority) {
using DropTest = BasicTest;
// Tests that RPCs are dropped according to the drop config.
TEST_F(DropTest, Vanilla) {
TEST_P(DropTest, Vanilla) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
@ -1481,7 +1469,7 @@ TEST_F(DropTest, Vanilla) {
}
// Tests that drop config is converted correctly from per hundred.
TEST_F(DropTest, DropPerHundred) {
TEST_P(DropTest, DropPerHundred) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
@ -1522,7 +1510,7 @@ TEST_F(DropTest, DropPerHundred) {
}
// Tests that drop config is converted correctly from per ten thousand.
TEST_F(DropTest, DropPerTenThousand) {
TEST_P(DropTest, DropPerTenThousand) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
@ -1563,7 +1551,7 @@ TEST_F(DropTest, DropPerTenThousand) {
}
// Tests that drop is working correctly after update.
TEST_F(DropTest, Update) {
TEST_P(DropTest, Update) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 1000;
@ -1659,7 +1647,7 @@ TEST_F(DropTest, Update) {
}
// Tests that all the RPCs are dropped if any drop category drops 100%.
TEST_F(DropTest, DropAll) {
TEST_P(DropTest, DropAll) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 1000;
@ -1688,7 +1676,7 @@ using FallbackTest = BasicTest;
// Tests that RPCs are handled by the fallback backends before the serverlist is
// received, but will be handled by the serverlist after it's received.
TEST_F(FallbackTest, Vanilla) {
TEST_P(FallbackTest, Vanilla) {
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendsInResolution = backends_.size() / 2;
@ -1703,7 +1691,7 @@ TEST_F(FallbackTest, Vanilla) {
ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args),
kServerlistDelayMs);
// Wait until all the fallback backends are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */,
WaitForAllBackends(0 /* start_index */,
kNumBackendsInResolution /* stop_index */);
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(kNumBackendsInResolution);
@ -1718,8 +1706,7 @@ TEST_F(FallbackTest, Vanilla) {
}
// Wait until the serverlist reception has been processed and all backends
// in the serverlist are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */,
kNumBackendsInResolution /* start_index */);
WaitForAllBackends(kNumBackendsInResolution /* start_index */);
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
CheckRpcSendOk(backends_.size() - kNumBackendsInResolution);
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
@ -1738,7 +1725,7 @@ TEST_F(FallbackTest, Vanilla) {
// Tests that RPCs are handled by the updated fallback backends before
// serverlist is received,
TEST_F(FallbackTest, Update) {
TEST_P(FallbackTest, Update) {
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendsInResolution = backends_.size() / 3;
@ -1755,7 +1742,7 @@ TEST_F(FallbackTest, Update) {
ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args),
kServerlistDelayMs);
// Wait until all the fallback backends are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */,
WaitForAllBackends(0 /* start_index */,
kNumBackendsInResolution /* stop_index */);
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
CheckRpcSendOk(kNumBackendsInResolution);
@ -1774,8 +1761,7 @@ TEST_F(FallbackTest, Update) {
kDefaultServiceConfig_.c_str());
// Wait until the resolution update has been processed and all the new
// fallback backends are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */,
kNumBackendsInResolution /* start_index */,
WaitForAllBackends(kNumBackendsInResolution /* start_index */,
kNumBackendsInResolution +
kNumBackendsInResolutionUpdate /* stop_index */);
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
@ -1796,9 +1782,8 @@ TEST_F(FallbackTest, Update) {
}
// Wait until the serverlist reception has been processed and all backends
// in the serverlist are reachable.
WaitForAllBackends(1 /* num_requests_multiple_of */,
kNumBackendsInResolution +
kNumBackendsInResolutionUpdate /* start_index */);
WaitForAllBackends(kNumBackendsInResolution +
kNumBackendsInResolutionUpdate /* start_index */);
gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
CheckRpcSendOk(backends_.size() - kNumBackendsInResolution -
kNumBackendsInResolutionUpdate);
@ -1819,7 +1804,7 @@ TEST_F(FallbackTest, Update) {
}
// Tests that fallback will kick in immediately if the balancer channel fails.
TEST_F(FallbackTest, FallbackEarlyWhenBalancerChannelFails) {
TEST_P(FallbackTest, FallbackEarlyWhenBalancerChannelFails) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
// Return an unreachable balancer and one fallback backend.
@ -1832,7 +1817,7 @@ TEST_F(FallbackTest, FallbackEarlyWhenBalancerChannelFails) {
}
// Tests that fallback will kick in immediately if the balancer call fails.
TEST_F(FallbackTest, FallbackEarlyWhenBalancerCallFails) {
TEST_P(FallbackTest, FallbackEarlyWhenBalancerCallFails) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
// Return one balancer and one fallback backend.
@ -1848,7 +1833,7 @@ TEST_F(FallbackTest, FallbackEarlyWhenBalancerCallFails) {
// Tests that fallback mode is entered if balancer response is received but the
// backends can't be reached.
TEST_F(FallbackTest, FallbackIfResponseReceivedButChildNotReady) {
TEST_P(FallbackTest, FallbackIfResponseReceivedButChildNotReady) {
const int kFallbackTimeoutMs = 500 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str());
@ -1866,7 +1851,7 @@ TEST_F(FallbackTest, FallbackIfResponseReceivedButChildNotReady) {
// Tests that fallback mode is exited if the balancer tells the client to drop
// all the calls.
TEST_F(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) {
TEST_P(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) {
// Return an unreachable balancer and one fallback backend.
SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()});
@ -1890,7 +1875,7 @@ TEST_F(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) {
}
// Tests that fallback mode is exited if the child policy becomes ready.
TEST_F(FallbackTest, FallbackModeIsExitedAfterChildRready) {
TEST_P(FallbackTest, FallbackModeIsExitedAfterChildRready) {
// Return an unreachable balancer and one fallback backend.
SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()});
@ -1929,7 +1914,7 @@ class BalancerUpdateTest : public XdsEnd2endTest {
// Tests that the old LB call is still used after the balancer address update as
// long as that call is still alive.
TEST_F(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) {
TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
@ -1982,7 +1967,7 @@ TEST_F(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) {
// of LBs as the one in SetUp() in order to verify that the LB channel inside
// xds keeps the initial connection (which by definition is also present in the
// update).
TEST_F(BalancerUpdateTest, Repeated) {
TEST_P(BalancerUpdateTest, Repeated) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
@ -2047,7 +2032,7 @@ TEST_F(BalancerUpdateTest, Repeated) {
// Tests that if the balancer is down, the RPCs will still be sent to the
// backends according to the last balancer response, until a new balancer is
// reachable.
TEST_F(BalancerUpdateTest, DeadUpdate) {
TEST_P(BalancerUpdateTest, DeadUpdate) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({balancers_[0]->port()});
AdsServiceImpl::ResponseArgs args({
@ -2115,9 +2100,9 @@ TEST_F(BalancerUpdateTest, DeadUpdate) {
// The re-resolution tests are deferred because they rely on the fallback mode,
// which hasn't been supported.
// TODO(juanlishen): Add TEST_F(BalancerUpdateTest, ReresolveDeadBackend).
// TODO(juanlishen): Add TEST_P(BalancerUpdateTest, ReresolveDeadBackend).
// TODO(juanlishen): Add TEST_F(UpdatesWithClientLoadReportingTest,
// TODO(juanlishen): Add TEST_P(UpdatesWithClientLoadReportingTest,
// ReresolveDeadBalancer)
class ClientLoadReportingTest : public XdsEnd2endTest {
@ -2126,7 +2111,7 @@ class ClientLoadReportingTest : public XdsEnd2endTest {
};
// Tests that the load report received at the balancer is correct.
TEST_F(ClientLoadReportingTest, Vanilla) {
TEST_P(ClientLoadReportingTest, Vanilla) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({balancers_[0]->port()});
const size_t kNumRpcsPerAddress = 100;
@ -2167,7 +2152,7 @@ TEST_F(ClientLoadReportingTest, Vanilla) {
// Tests that if the balancer restarts, the client load report contains the
// stats before and after the restart correctly.
TEST_F(ClientLoadReportingTest, BalancerRestart) {
TEST_P(ClientLoadReportingTest, BalancerRestart) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({balancers_[0]->port()});
const size_t kNumBackendsFirstPass = backends_.size() / 2;
@ -2182,7 +2167,7 @@ TEST_F(ClientLoadReportingTest, BalancerRestart) {
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) =
WaitForAllBackends(/* num_requests_multiple_of */ 1, /* start_index */ 0,
WaitForAllBackends(/* start_index */ 0,
/* stop_index */ kNumBackendsFirstPass);
ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport();
EXPECT_EQ(static_cast<size_t>(num_ok),
@ -2192,15 +2177,19 @@ TEST_F(ClientLoadReportingTest, BalancerRestart) {
EXPECT_EQ(0U, client_stats->total_dropped_requests());
// Shut down the balancer.
balancers_[0]->Shutdown();
// Send 1 more request per backend. This will continue using the
// last serverlist we received from the balancer before it was shut down.
// We should continue using the last EDS response we received from the
// balancer before it was shut down.
// Note: We need to use WaitForAllBackends() here instead of just
// CheckRpcSendOk(kNumBackendsFirstPass), because when the balancer
// shuts down, the XdsClient will generate an error to the
// ServiceConfigWatcher, which will cause the xds resolver to send a
// no-op update to the LB policy. When this update gets down to the
// round_robin child policy for the locality, it will generate a new
// subchannel list, which resets the start index randomly. So we need
// to be a little more permissive here to avoid spurious failures.
ResetBackendCounters();
CheckRpcSendOk(kNumBackendsFirstPass);
int num_started = kNumBackendsFirstPass;
// Each backend should have gotten 1 request.
for (size_t i = 0; i < kNumBackendsFirstPass; ++i) {
EXPECT_EQ(1UL, backends_[i]->backend_service()->request_count());
}
int num_started = std::get<0>(WaitForAllBackends(
/* start_index */ 0, /* stop_index */ kNumBackendsFirstPass));
// Now restart the balancer, this time pointing to the new backends.
balancers_[0]->Start(server_host_);
args = AdsServiceImpl::ResponseArgs({
@ -2210,8 +2199,7 @@ TEST_F(ClientLoadReportingTest, BalancerRestart) {
// Wait for queries to start going to one of the new backends.
// This tells us that we're now using the new serverlist.
std::tie(num_ok, num_failure, num_drops) =
WaitForAllBackends(/* num_requests_multiple_of */ 1,
/* start_index */ kNumBackendsFirstPass);
WaitForAllBackends(/* start_index */ kNumBackendsFirstPass);
num_started += num_ok + num_failure + num_drops;
// Send one RPC per backend.
CheckRpcSendOk(kNumBackendsSecondPass);
@ -2230,7 +2218,7 @@ class ClientLoadReportingWithDropTest : public XdsEnd2endTest {
};
// Tests that the drop stats are correctly reported by client load reporting.
TEST_F(ClientLoadReportingWithDropTest, Vanilla) {
TEST_P(ClientLoadReportingWithDropTest, Vanilla) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 3000;
@ -2293,6 +2281,29 @@ TEST_F(ClientLoadReportingWithDropTest, Vanilla) {
EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
}
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, BasicTest, ::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, SecureNamingTest, ::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, LocalityMapTest, ::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, FailoverTest, ::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, DropTest, ::testing::Bool());
// Fallback does not work with xds resolver.
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, FallbackTest,
::testing::Values(false));
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, BalancerUpdateTest,
::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, ClientLoadReportingTest,
::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, ClientLoadReportingWithDropTest,
::testing::Bool());
} // namespace
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save