diff --git a/BUILD b/BUILD index e72b9e50cd5..026045f076b 100644 --- a/BUILD +++ b/BUILD @@ -320,6 +320,7 @@ grpc_cc_library( "grpc_common", "grpc_lb_policy_grpclb", "grpc_lb_policy_xds", + "grpc_resolver_xds", ], ) @@ -336,6 +337,7 @@ grpc_cc_library( "grpc_common", "grpc_lb_policy_grpclb_secure", "grpc_lb_policy_xds_secure", + "grpc_resolver_xds_secure", "grpc_secure", "grpc_transport_chttp2_client_secure", "grpc_transport_chttp2_server_secure", @@ -994,7 +996,6 @@ grpc_cc_library( "grpc_resolver_fake", "grpc_resolver_dns_native", "grpc_resolver_sockaddr", - "grpc_resolver_xds", "grpc_transport_chttp2_client_insecure", "grpc_transport_chttp2_server_insecure", "grpc_transport_inproc", @@ -1581,6 +1582,20 @@ grpc_cc_library( deps = [ "grpc_base", "grpc_client_channel", + "grpc_xds_client", + ], +) + +grpc_cc_library( + name = "grpc_resolver_xds_secure", + srcs = [ + "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc", + ], + language = "c++", + deps = [ + "grpc_base", + "grpc_client_channel", + "grpc_xds_client_secure", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index ebd1eb22a6e..51eb38c9e49 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2821,13 +2821,6 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc - src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c - src/core/ext/filters/client_channel/lb_policy/xds/xds.cc src/core/ext/filters/client_channel/xds/xds_api.cc src/core/ext/filters/client_channel/xds/xds_bootstrap.cc src/core/ext/filters/client_channel/xds/xds_channel.cc @@ -2853,6 +2846,13 @@ add_library(grpc_unsecure src/core/ext/upb-generated/envoy/api/v2/core/protocol.upb.c src/core/ext/upb-generated/envoy/type/percent.upb.c src/core/ext/upb-generated/envoy/type/range.upb.c + src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc + src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc + src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c + src/core/ext/filters/client_channel/lb_policy/xds/xds.cc src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc src/core/ext/filters/census/grpc_context.cc diff --git a/Makefile b/Makefile index b253ff170f5..41737181605 100644 --- a/Makefile +++ b/Makefile @@ -5320,13 +5320,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \ src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc \ src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \ - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc \ - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc \ - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \ - src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \ - src/core/ext/filters/client_channel/lb_policy/xds/xds.cc \ src/core/ext/filters/client_channel/xds/xds_api.cc \ src/core/ext/filters/client_channel/xds/xds_bootstrap.cc \ src/core/ext/filters/client_channel/xds/xds_channel.cc \ @@ -5352,6 +5345,13 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/upb-generated/envoy/api/v2/core/protocol.upb.c \ src/core/ext/upb-generated/envoy/type/percent.upb.c \ src/core/ext/upb-generated/envoy/type/range.upb.c \ + src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc \ + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc \ + src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \ + src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c \ + src/core/ext/filters/client_channel/lb_policy/xds/xds.cc \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/census/grpc_context.cc \ diff --git a/build.yaml b/build.yaml index 62f19f6f219..e7ec234b673 100644 --- a/build.yaml +++ b/build.yaml @@ -1230,6 +1230,16 @@ filegroups: uses: - grpc_base - grpc_client_channel + - grpc_xds_client +- name: grpc_resolver_xds_secure + src: + - src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc + plugin: grpc_resolver_xds + uses: + - grpc_base + - grpc_client_channel + - grpc_secure + - grpc_xds_client_secure - name: grpc_secure public_headers: - include/grpc/grpc_security.h @@ -1678,7 +1688,7 @@ libs: - grpc_resolver_dns_native - grpc_resolver_sockaddr - grpc_resolver_fake - - grpc_resolver_xds + - grpc_resolver_xds_secure - grpc_secure - census - grpc_client_idle_filter diff --git a/grpc.gyp b/grpc.gyp index 5cbdeef8a68..53636aef3aa 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -1416,13 +1416,6 @@ 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', 'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc', - 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', - 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc', - 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc', - 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc', - 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc', - 'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c', - 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc', 'src/core/ext/filters/client_channel/xds/xds_api.cc', 'src/core/ext/filters/client_channel/xds/xds_bootstrap.cc', 'src/core/ext/filters/client_channel/xds/xds_channel.cc', @@ -1448,6 +1441,13 @@ 'src/core/ext/upb-generated/envoy/api/v2/core/protocol.upb.c', 'src/core/ext/upb-generated/envoy/type/percent.upb.c', 'src/core/ext/upb-generated/envoy/type/range.upb.c', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc', + 'src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c', + 'src/core/ext/filters/client_channel/lb_policy/xds/xds.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/census/grpc_context.cc', diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 55d3516a357..b9ae566aa1e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -373,6 +373,11 @@ class XdsLb : public LoadBalancingPolicy { const char* name, const grpc_channel_args* args); void MaybeExitFallbackMode(); + XdsClient* xds_client() const { + return xds_client_from_channel_ != nullptr ? xds_client_from_channel_.get() + : xds_client_.get(); + } + // Name of the backend server to connect to. const char* server_name_ = nullptr; @@ -382,7 +387,11 @@ class XdsLb : public LoadBalancingPolicy { // Internal state. bool shutting_down_ = false; - // The xds client. + // The xds client and endpoint watcher. + // If we get the XdsClient from the channel, we store it in + // xds_client_from_channel_; if we create it ourselves, we store it in + // xds_client_. + RefCountedPtr xds_client_from_channel_; OrphanablePtr xds_client_; // A pointer to the endpoint watcher, to be used when cancelling the watch. // Note that this is not owned, so this pointer must never be derefernced. @@ -580,6 +589,10 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface { : xds_policy_(std::move(xds_policy)) {} void OnEndpointChanged(EdsUpdate update) override { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] Received EDS update from xds client", + xds_policy_.get()); + } // If the balancer tells us to drop all the calls, we should exit fallback // mode immediately. if (update.drop_all) xds_policy_->MaybeExitFallbackMode(); @@ -647,6 +660,7 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface { XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)), + xds_client_from_channel_(XdsClient::GetFromChannelArgs(*args.args)), lb_fallback_timeout_ms_(grpc_channel_args_find_integer( args.args, GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS, {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX})), @@ -657,6 +671,11 @@ XdsLb::XdsLb(Args args) args.args, GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, {GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS, 0, INT_MAX})), priority_list_(this) { + if (xds_client_from_channel_ != nullptr && + GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] Using xds client %p from channel", this, + xds_client_from_channel_.get()); + } // Record server name. const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(arg); @@ -699,12 +718,11 @@ void XdsLb::ShutdownLocked() { pending_fallback_policy_.reset(); // Cancel the endpoint watch here instead of in our dtor, because the // watcher holds a ref to us. - if (xds_client_ != nullptr) { - xds_client_->CancelEndpointDataWatch(StringView(server_name_), - endpoint_watcher_); - xds_client_->RemoveClientStats(StringView(server_name_), &client_stats_); - xds_client_.reset(); - } + xds_client()->CancelEndpointDataWatch(StringView(server_name_), + endpoint_watcher_); + xds_client()->RemoveClientStats(StringView(server_name_), &client_stats_); + xds_client_from_channel_.reset(); + xds_client_.reset(); } // @@ -712,9 +730,9 @@ void XdsLb::ShutdownLocked() { // void XdsLb::ResetBackoffLocked() { - // TODO(roth): When we instantiate the XdsClient in the resolver - // instead of in this LB policy, this should be done in the resolver - // instead of here. + // When the XdsClient is instantiated in the resolver instead of in this + // LB policy, this is done via the resolver, so we don't need to do it + // for xds_client_from_channel_ here. if (xds_client_ != nullptr) xds_client_->ResetBackoff(); priority_list_.ResetBackoffLocked(); if (fallback_policy_ != nullptr) { @@ -726,7 +744,10 @@ void XdsLb::ResetBackoffLocked() { } void XdsLb::UpdateLocked(UpdateArgs args) { - const bool is_initial_update = xds_client_ == nullptr; + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] Received update", this); + } + const bool is_initial_update = args_ == nullptr; // Update config. auto* xds_config = static_cast(args.config.get()); child_policy_config_ = xds_config->child_policy(); @@ -737,32 +758,32 @@ void XdsLb::UpdateLocked(UpdateArgs args) { grpc_channel_args_destroy(args_); args_ = args.args; args.args = nullptr; - // Create an xds client if we don't have one yet. - if (xds_client_ == nullptr) { - grpc_error* error = GRPC_ERROR_NONE; - xds_client_ = MakeOrphanable( - combiner(), interested_parties(), StringView(server_name_), - nullptr /* service config watcher */, *args_, &error); - // TODO(roth): When we move instantiation of the XdsClient into the - // xds resolver, add proper error handling there. - GPR_ASSERT(error == GRPC_ERROR_NONE); - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, "[xdslb %p] Created xds client %p", this, - xds_client_.get()); - } - endpoint_watcher_ = New(Ref()); - xds_client_->WatchEndpointData( - StringView(server_name_), - UniquePtr(endpoint_watcher_)); - xds_client_->AddClientStats(StringView(server_name_), &client_stats_); - } // Update priority list. priority_list_.UpdateLocked(); // Update the existing fallback policy. The fallback policy config and/or the // fallback addresses may be new. if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked(); - // If this is the initial update, start the fallback-at-startup checks. if (is_initial_update) { + // Initialize XdsClient. + if (xds_client_from_channel_ == nullptr) { + grpc_error* error = GRPC_ERROR_NONE; + xds_client_ = MakeOrphanable( + combiner(), interested_parties(), StringView(server_name_), + nullptr /* service config watcher */, *args_, &error); + // TODO(roth): If we decide that we care about fallback mode, add + // proper error handling here. + GPR_ASSERT(error == GRPC_ERROR_NONE); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] Created xds client %p", this, + xds_client_.get()); + } + } + auto watcher = MakeUnique(Ref()); + endpoint_watcher_ = watcher.get(); + xds_client()->WatchEndpointData(StringView(server_name_), + std::move(watcher)); + xds_client()->AddClientStats(StringView(server_name_), &client_stats_); + // Start fallback-at-startup checks. grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this, diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index eac64b44a98..41dfbcde911 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -19,39 +19,84 @@ #include #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/xds/xds_client.h" +#include "src/core/lib/gprpp/string_view.h" namespace grpc_core { namespace { +// +// XdsResolver +// + class XdsResolver : public Resolver { public: explicit XdsResolver(ResolverArgs args) : Resolver(args.combiner, std::move(args.result_handler)), - args_(grpc_channel_args_copy(args.args)) {} + args_(grpc_channel_args_copy(args.args)), + interested_parties_(args.pollset_set) { + char* path = args.uri->path; + if (path[0] == '/') ++path; + server_name_.reset(gpr_strdup(path)); + } + ~XdsResolver() override { grpc_channel_args_destroy(args_); } void StartLocked() override; - void ShutdownLocked() override{}; + void ShutdownLocked() override { xds_client_.reset(); } private: + class ServiceConfigWatcher : public XdsClient::ServiceConfigWatcherInterface { + public: + explicit ServiceConfigWatcher(RefCountedPtr resolver) + : resolver_(std::move(resolver)) {} + void OnServiceConfigChanged( + RefCountedPtr service_config) override; + void OnError(grpc_error* error) override; + + private: + RefCountedPtr resolver_; + }; + + UniquePtr server_name_; const grpc_channel_args* args_; + grpc_pollset_set* interested_parties_; + OrphanablePtr xds_client_; }; -void XdsResolver::StartLocked() { - static const char* service_config = - "{\n" - " \"loadBalancingConfig\":[\n" - " { \"xds_experimental\":{} }\n" - " ]\n" - "}"; +void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged( + RefCountedPtr service_config) { + grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg(); Result result; - result.args = args_; - args_ = nullptr; + result.args = + grpc_channel_args_copy_and_add(resolver_->args_, &xds_client_arg, 1); + result.service_config = std::move(service_config); + resolver_->result_handler()->ReturnResult(std::move(result)); +} + +void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) { + grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg(); + Result result; + result.args = + grpc_channel_args_copy_and_add(resolver_->args_, &xds_client_arg, 1); + result.service_config_error = error; + resolver_->result_handler()->ReturnResult(std::move(result)); +} + +void XdsResolver::StartLocked() { grpc_error* error = GRPC_ERROR_NONE; - result.service_config = ServiceConfig::Create(service_config, &error); - result_handler()->ReturnResult(std::move(result)); + xds_client_ = MakeOrphanable( + combiner(), interested_parties_, StringView(server_name_.get()), + MakeUnique(Ref()), *args_, &error); + if (error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, + "Failed to create xds client -- channel will remain in " + "TRANSIENT_FAILURE: %s", + grpc_error_string(error)); + result_handler()->ReturnError(error); + } } // diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index fdc2e74c953..c168da3a9f5 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -68,222 +68,181 @@ namespace grpc_core { TraceFlag grpc_xds_client_trace(false, "xds_client"); -// Contains a channel to the xds server and all the data related to the -// channel. Holds a ref to the xds client object. -// TODO(roth): This is separate from the XdsClient object because it was -// originally designed to be able to swap itself out in case the -// balancer name changed. Now that the balancer name is going to be -// coming from the bootstrap file, we don't really need this level of -// indirection unless we decide to support watching the bootstrap file -// for changes. At some point, if we decide that we're never going to -// need to do that, then we can eliminate this class and move its -// contents directly into the XdsClient class. -class XdsClient::ChannelState : public InternallyRefCounted { +// +// Internal class declarations +// + +// An xds call wrapper that can restart a call upon failure. Holds a ref to +// the xds channel. The template parameter is the kind of wrapped xds call. +template +class XdsClient::ChannelState::RetryableCall + : public InternallyRefCounted> { public: - // An xds call wrapper that can restart a call upon failure. Holds a ref to - // the xds channel. The template parameter is the kind of wrapped xds call. - template - class RetryableCall : public InternallyRefCounted> { - public: - explicit RetryableCall(RefCountedPtr chand); + explicit RetryableCall(RefCountedPtr chand); - void Orphan() override; + void Orphan() override; - void OnCallFinishedLocked(); + void OnCallFinishedLocked(); - T* calld() const { return calld_.get(); } - ChannelState* chand() const { return chand_.get(); } + T* calld() const { return calld_.get(); } + ChannelState* chand() const { return chand_.get(); } - private: - void StartNewCallLocked(); - void StartRetryTimerLocked(); - static void OnRetryTimerLocked(void* arg, grpc_error* error); - - // The wrapped call that talks to the xds server. It's instantiated - // every time we start a new call. It's null during call retry backoff. - OrphanablePtr calld_; - // The owning xds channel. - RefCountedPtr chand_; - - // Retry state. - BackOff backoff_; - grpc_timer retry_timer_; - grpc_closure on_retry_timer_; - bool retry_timer_callback_pending_ = false; - - bool shutting_down_ = false; - }; + bool IsCurrentCallOnChannel() const; - // Contains an ADS call to the xds server. - class AdsCallState : public InternallyRefCounted { - public: - // The ctor and dtor should not be used directly. - explicit AdsCallState(RefCountedPtr> parent); - ~AdsCallState() override; + private: + void StartNewCallLocked(); + void StartRetryTimerLocked(); + static void OnRetryTimerLocked(void* arg, grpc_error* error); + + // The wrapped xds call that talks to the xds server. It's instantiated + // every time we start a new call. It's null during call retry backoff. + OrphanablePtr calld_; + // The owning xds channel. + RefCountedPtr chand_; + + // Retry state. + BackOff backoff_; + grpc_timer retry_timer_; + grpc_closure on_retry_timer_; + bool retry_timer_callback_pending_ = false; - void Orphan() override; + bool shutting_down_ = false; +}; - RetryableCall* parent() const { return parent_.get(); } - ChannelState* chand() const { return parent_->chand(); } - XdsClient* xds_client() const { return chand()->xds_client(); } - bool seen_response() const { return seen_response_; } +// Contains an ADS call to the xds server. +class XdsClient::ChannelState::AdsCallState + : public InternallyRefCounted { + public: + // The ctor and dtor should not be used directly. + explicit AdsCallState(RefCountedPtr> parent); + ~AdsCallState() override; - private: - static void OnResponseReceivedLocked(void* arg, grpc_error* error); - static void OnStatusReceivedLocked(void* arg, grpc_error* error); + void Orphan() override; - bool IsCurrentCallOnChannel() const; + RetryableCall* parent() const { return parent_.get(); } + ChannelState* chand() const { return parent_->chand(); } + XdsClient* xds_client() const { return chand()->xds_client(); } + bool seen_response() const { return seen_response_; } - // The owning RetryableCall<>. - RefCountedPtr> parent_; - bool seen_response_ = false; + private: + static void OnResponseReceivedLocked(void* arg, grpc_error* error); + static void OnStatusReceivedLocked(void* arg, grpc_error* error); - // Always non-NULL. - grpc_call* call_; + bool IsCurrentCallOnChannel() const; - // recv_initial_metadata - grpc_metadata_array initial_metadata_recv_; + // The owning RetryableCall<>. + RefCountedPtr> parent_; + bool seen_response_ = false; - // send_message - grpc_byte_buffer* send_message_payload_ = nullptr; + // Always non-NULL. + grpc_call* call_; - // recv_message - grpc_byte_buffer* recv_message_payload_ = nullptr; - grpc_closure on_response_received_; + // recv_initial_metadata + grpc_metadata_array initial_metadata_recv_; - // recv_trailing_metadata - grpc_metadata_array trailing_metadata_recv_; - grpc_status_code status_code_; - grpc_slice status_details_; - grpc_closure on_status_received_; - }; + // send_message + grpc_byte_buffer* send_message_payload_ = nullptr; - // Contains an LRS call to the xds server. - class LrsCallState : public InternallyRefCounted { - public: - // The ctor and dtor should not be used directly. - explicit LrsCallState(RefCountedPtr> parent); - ~LrsCallState() override; + // recv_message + grpc_byte_buffer* recv_message_payload_ = nullptr; + grpc_closure on_response_received_; - void Orphan() override; + // recv_trailing_metadata + grpc_metadata_array trailing_metadata_recv_; + grpc_status_code status_code_; + grpc_slice status_details_; + grpc_closure on_status_received_; +}; - void MaybeStartReportingLocked(); +// Contains an LRS call to the xds server. +class XdsClient::ChannelState::LrsCallState + : public InternallyRefCounted { + public: + // The ctor and dtor should not be used directly. + explicit LrsCallState(RefCountedPtr> parent); + ~LrsCallState() override; - RetryableCall* parent() { return parent_.get(); } - ChannelState* chand() const { return parent_->chand(); } - XdsClient* xds_client() const { return chand()->xds_client(); } - bool seen_response() const { return seen_response_; } + void Orphan() override; - private: - // Reports client-side load stats according to a fixed interval. - class Reporter : public InternallyRefCounted { - public: - Reporter(RefCountedPtr parent, grpc_millis report_interval) - : parent_(std::move(parent)), report_interval_(report_interval) { - GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimerLocked, this, - grpc_combiner_scheduler(xds_client()->combiner_)); - GRPC_CLOSURE_INIT(&on_report_done_, OnReportDoneLocked, this, - grpc_combiner_scheduler(xds_client()->combiner_)); - ScheduleNextReportLocked(); - } + void MaybeStartReportingLocked(); - void Orphan() override; + RetryableCall* parent() { return parent_.get(); } + ChannelState* chand() const { return parent_->chand(); } + XdsClient* xds_client() const { return chand()->xds_client(); } + bool seen_response() const { return seen_response_; } - private: - void ScheduleNextReportLocked(); - static void OnNextReportTimerLocked(void* arg, grpc_error* error); - void SendReportLocked(); - static void OnReportDoneLocked(void* arg, grpc_error* error); + private: + // Reports client-side load stats according to a fixed interval. + class Reporter : public InternallyRefCounted { + public: + Reporter(RefCountedPtr parent, grpc_millis report_interval) + : parent_(std::move(parent)), report_interval_(report_interval) { + GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimerLocked, this, + grpc_combiner_scheduler(xds_client()->combiner_)); + GRPC_CLOSURE_INIT(&on_report_done_, OnReportDoneLocked, this, + grpc_combiner_scheduler(xds_client()->combiner_)); + ScheduleNextReportLocked(); + } - bool IsCurrentReporterOnCall() const { - return this == parent_->reporter_.get(); - } - XdsClient* xds_client() const { return parent_->xds_client(); } - - // The owning LRS call. - RefCountedPtr parent_; - - // The load reporting state. - const grpc_millis report_interval_; - bool last_report_counters_were_zero_ = false; - bool next_report_timer_callback_pending_ = false; - grpc_timer next_report_timer_; - grpc_closure on_next_report_timer_; - grpc_closure on_report_done_; - }; - - static void OnInitialRequestSentLocked(void* arg, grpc_error* error); - static void OnResponseReceivedLocked(void* arg, grpc_error* error); - static void OnStatusReceivedLocked(void* arg, grpc_error* error); - - bool IsCurrentCallOnChannel() const; - - // The owning RetryableCall<>. - RefCountedPtr> parent_; - bool seen_response_ = false; - - // Always non-NULL. - grpc_call* call_; - - // recv_initial_metadata - grpc_metadata_array initial_metadata_recv_; - - // send_message - grpc_byte_buffer* send_message_payload_ = nullptr; - grpc_closure on_initial_request_sent_; - - // recv_message - grpc_byte_buffer* recv_message_payload_ = nullptr; - grpc_closure on_response_received_; - - // recv_trailing_metadata - grpc_metadata_array trailing_metadata_recv_; - grpc_status_code status_code_; - grpc_slice status_details_; - grpc_closure on_status_received_; - - // Load reporting state. - UniquePtr cluster_name_; - grpc_millis load_reporting_interval_ = 0; - OrphanablePtr reporter_; - }; + void Orphan() override; - ChannelState(RefCountedPtr xds_client, - const grpc_channel_args& args); - ~ChannelState(); + private: + void ScheduleNextReportLocked(); + static void OnNextReportTimerLocked(void* arg, grpc_error* error); + void SendReportLocked(); + static void OnReportDoneLocked(void* arg, grpc_error* error); - void Orphan() override; + bool IsCurrentReporterOnCall() const { + return this == parent_->reporter_.get(); + } + XdsClient* xds_client() const { return parent_->xds_client(); } + + // The owning LRS call. + RefCountedPtr parent_; + + // The load reporting state. + const grpc_millis report_interval_; + bool last_report_counters_were_zero_ = false; + bool next_report_timer_callback_pending_ = false; + grpc_timer next_report_timer_; + grpc_closure on_next_report_timer_; + grpc_closure on_report_done_; + }; - grpc_channel* channel() const { return channel_; } - XdsClient* xds_client() const { return xds_client_.get(); } - AdsCallState* ads_calld() const { return ads_calld_->calld(); } - LrsCallState* lrs_calld() const { return lrs_calld_->calld(); } + static void OnInitialRequestSentLocked(void* arg, grpc_error* error); + static void OnResponseReceivedLocked(void* arg, grpc_error* error); + static void OnStatusReceivedLocked(void* arg, grpc_error* error); - void MaybeStartAdsCall(); - void StopAdsCall(); + bool IsCurrentCallOnChannel() const; - void MaybeStartLrsCall(); - void StopLrsCall(); + // The owning RetryableCall<>. + RefCountedPtr> parent_; + bool seen_response_ = false; - bool HasActiveAdsCall() const { return ads_calld_->calld() != nullptr; } + // Always non-NULL. + grpc_call* call_; - void StartConnectivityWatchLocked(); - void CancelConnectivityWatchLocked(); + // recv_initial_metadata + grpc_metadata_array initial_metadata_recv_; - private: - class StateWatcher; + // send_message + grpc_byte_buffer* send_message_payload_ = nullptr; + grpc_closure on_initial_request_sent_; - // The owning xds client. - RefCountedPtr xds_client_; + // recv_message + grpc_byte_buffer* recv_message_payload_ = nullptr; + grpc_closure on_response_received_; - // The channel and its status. - grpc_channel* channel_; - bool shutting_down_ = false; - StateWatcher* watcher_ = nullptr; + // recv_trailing_metadata + grpc_metadata_array trailing_metadata_recv_; + grpc_status_code status_code_; + grpc_slice status_details_; + grpc_closure on_status_received_; - // The retryable XDS calls. - OrphanablePtr> ads_calld_; - OrphanablePtr> lrs_calld_; + // Load reporting state. + UniquePtr cluster_name_; + grpc_millis load_reporting_interval_ = 0; + OrphanablePtr reporter_; }; // @@ -400,6 +359,20 @@ void XdsClient::ChannelState::Orphan() { Unref(DEBUG_LOCATION, "ChannelState+orphaned"); } +XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld() + const { + return ads_calld_->calld(); +} + +XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld() + const { + return lrs_calld_->calld(); +} + +bool XdsClient::ChannelState::HasActiveAdsCall() const { + return ads_calld_->calld() != nullptr; +} + void XdsClient::ChannelState::MaybeStartAdsCall() { if (ads_calld_ != nullptr) return; ads_calld_.reset(New>( @@ -1213,7 +1186,13 @@ XdsClient::XdsClient(grpc_combiner* combiner, } chand_ = MakeOrphanable( Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel_args); - // TODO(roth): Start LDS call. + if (service_config_watcher_ != nullptr) { + // TODO(juanlishen): Start LDS call and do not return service config + // until we get the first LDS response. + GRPC_CLOSURE_INIT(&service_config_notify_, NotifyOnServiceConfig, + Ref().release(), grpc_combiner_scheduler(combiner_)); + GRPC_CLOSURE_SCHED(&service_config_notify_, GRPC_ERROR_NONE); + } } XdsClient::~XdsClient() { GRPC_COMBINER_UNREF(combiner_, "xds_client"); } @@ -1226,12 +1205,12 @@ void XdsClient::Orphan() { void XdsClient::WatchClusterData(StringView cluster, UniquePtr watcher) { - // TODO(roth): Implement. + // TODO(juanlishen): Implement. } void XdsClient::CancelClusterDataWatch(StringView cluster, ClusterWatcherInterface* watcher) { - // TODO(roth): Implement. + // TODO(juanlishen): Implement. } void XdsClient::WatchEndpointData(StringView cluster, @@ -1252,7 +1231,9 @@ void XdsClient::CancelEndpointDataWatch(StringView cluster, if (it != cluster_state_.endpoint_watchers.end()) { cluster_state_.endpoint_watchers.erase(it); } - if (cluster_state_.endpoint_watchers.empty()) chand_->StopAdsCall(); + if (chand_ != nullptr && cluster_state_.endpoint_watchers.empty()) { + chand_->StopAdsCall(); + } } void XdsClient::AddClientStats(StringView cluster, @@ -1270,7 +1251,9 @@ void XdsClient::RemoveClientStats(StringView cluster, if (it != cluster_state_.client_stats.end()) { cluster_state_.client_stats.erase(it); } - if (cluster_state_.client_stats.empty()) chand_->StopLrsCall(); + if (chand_ != nullptr && cluster_state_.client_stats.empty()) { + chand_->StopLrsCall(); + } } void XdsClient::ResetBackoff() { @@ -1280,9 +1263,6 @@ void XdsClient::ResetBackoff() { } void XdsClient::NotifyOnError(grpc_error* error) { - // TODO(roth): Once we implement the full LDS flow, it will not be - // necessary to check for the service config watcher being non-null, - // because that will always be true. if (service_config_watcher_ != nullptr) { service_config_watcher_->OnError(GRPC_ERROR_REF(error)); } @@ -1295,6 +1275,27 @@ void XdsClient::NotifyOnError(grpc_error* error) { GRPC_ERROR_UNREF(error); } +void XdsClient::NotifyOnServiceConfig(void* arg, grpc_error* error) { + XdsClient* self = static_cast(arg); + // TODO(roth): When we add support for WeightedClusters, select the + // LB policy based on that functionality. + static const char* json = + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"xds_experimental\":{} }\n" + " ]\n" + "}"; + RefCountedPtr service_config = + ServiceConfig::Create(json, &error); + if (error != GRPC_ERROR_NONE) { + self->service_config_watcher_->OnError(error); + } else { + self->service_config_watcher_->OnServiceConfigChanged( + std::move(service_config)); + } + self->Unref(); +} + void* XdsClient::ChannelArgCopy(void* p) { XdsClient* xds_client = static_cast(p); xds_client->Ref().release(); diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index c139853d3b5..dd2113e4062 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -112,7 +112,61 @@ class XdsClient : public InternallyRefCounted { const grpc_channel_args& args); private: - class ChannelState; + // Contains a channel to the xds server and all the data related to the + // channel. Holds a ref to the xds client object. + // TODO(roth): This is separate from the XdsClient object because it was + // originally designed to be able to swap itself out in case the + // balancer name changed. Now that the balancer name is going to be + // coming from the bootstrap file, we don't really need this level of + // indirection unless we decide to support watching the bootstrap file + // for changes. At some point, if we decide that we're never going to + // need to do that, then we can eliminate this class and move its + // contents directly into the XdsClient class. + class ChannelState : public InternallyRefCounted { + public: + template + class RetryableCall; + + class AdsCallState; + class LrsCallState; + + ChannelState(RefCountedPtr xds_client, + const grpc_channel_args& args); + ~ChannelState(); + + void Orphan() override; + + grpc_channel* channel() const { return channel_; } + XdsClient* xds_client() const { return xds_client_.get(); } + AdsCallState* ads_calld() const; + LrsCallState* lrs_calld() const; + + void MaybeStartAdsCall(); + void StopAdsCall(); + + void MaybeStartLrsCall(); + void StopLrsCall(); + + bool HasActiveAdsCall() const; + + void StartConnectivityWatchLocked(); + void CancelConnectivityWatchLocked(); + + private: + class StateWatcher; + + // The owning xds client. + RefCountedPtr xds_client_; + + // The channel and its status. + grpc_channel* channel_; + bool shutting_down_ = false; + StateWatcher* watcher_ = nullptr; + + // The retryable XDS calls. + OrphanablePtr> ads_calld_; + OrphanablePtr> lrs_calld_; + }; struct ClusterState { Map> @@ -127,6 +181,10 @@ class XdsClient : public InternallyRefCounted { // Sends an error notification to all watchers. void NotifyOnError(grpc_error* error); + // TODO(juanlishen): Once we implement LDS support, this can be a + // normal method instead of a closure callback. + static void NotifyOnServiceConfig(void* arg, grpc_error* error); + // Channel arg vtable functions. static void* ChannelArgCopy(void* p); static void ChannelArgDestroy(void* p); @@ -143,6 +201,9 @@ class XdsClient : public InternallyRefCounted { UniquePtr server_name_; UniquePtr service_config_watcher_; + // TODO(juanlishen): Once we implement LDS support, this will no + // longer be needed. + grpc_closure service_config_notify_; // The channel for communicating with the xds server. OrphanablePtr chand_; diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 69b92504bb8..4fa381adb4b 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -293,7 +293,7 @@ class AdsServiceImpl : public AdsService { Status StreamAggregatedResources(ServerContext* context, Stream* stream) override { - gpr_log(GPR_INFO, "LB[%p]: ADS StreamAggregatedResources starts", this); + gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this); [&]() { { grpc_core::MutexLock lock(&ads_mu_); @@ -306,7 +306,7 @@ class AdsServiceImpl : public AdsService { DiscoveryRequest request; if (!stream->Read(&request)) return; IncreaseRequestCount(); - gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this, + gpr_log(GPR_INFO, "ADS[%p]: received initial message '%s'", this, request.DebugString().c_str()); // Send response. std::vector responses_and_delays; @@ -322,7 +322,7 @@ class AdsServiceImpl : public AdsService { grpc_core::MutexLock lock(&ads_mu_); ads_cond_.WaitUntil(&ads_mu_, [this] { return ads_done_; }); }(); - gpr_log(GPR_INFO, "LB[%p]: ADS StreamAggregatedResources done", this); + gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources done", this); return Status::OK; } @@ -343,7 +343,7 @@ class AdsServiceImpl : public AdsService { NotifyDoneWithAdsCallLocked(); responses_and_delays_.clear(); } - gpr_log(GPR_INFO, "LB[%p]: shut down", this); + gpr_log(GPR_INFO, "ADS[%p]: shut down", this); } static DiscoveryResponse BuildResponse(const ResponseArgs& args) { @@ -398,11 +398,11 @@ class AdsServiceImpl : public AdsService { private: void SendResponse(Stream* stream, const DiscoveryResponse& response, int delay_ms) { - gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms); + gpr_log(GPR_INFO, "ADS[%p]: sleeping for %d ms...", this, delay_ms); if (delay_ms > 0) { gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms)); } - gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this, + gpr_log(GPR_INFO, "ADS[%p]: Woke up! Sending response '%s'", this, response.DebugString().c_str()); IncreaseResponseCount(); stream->Write(response); @@ -424,7 +424,7 @@ class LrsServiceImpl : public LrsService { client_load_reporting_interval_seconds) {} Status StreamLoadStats(ServerContext* context, Stream* stream) override { - gpr_log(GPR_INFO, "LB[%p]: LRS StreamLoadStats starts", this); + gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this); // Read request. LoadStatsRequest request; if (stream->Read(&request)) { @@ -442,7 +442,7 @@ class LrsServiceImpl : public LrsService { // Wait for report. request.Clear(); if (stream->Read(&request)) { - gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'", + gpr_log(GPR_INFO, "LRS[%p]: received client load report message '%s'", this, request.DebugString().c_str()); GPR_ASSERT(request.cluster_stats().size() == 1); const ClusterStats& cluster_stats = request.cluster_stats()[0]; @@ -459,7 +459,7 @@ class LrsServiceImpl : public LrsService { grpc_core::MutexLock lock(&lrs_mu_); lrs_cv_.WaitUntil(&lrs_mu_, [this] { return lrs_done; }); } - gpr_log(GPR_INFO, "LB[%p]: LRS done", this); + gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this); return Status::OK; } @@ -474,7 +474,7 @@ class LrsServiceImpl : public LrsService { grpc_core::MutexLock lock(&lrs_mu_); NotifyDoneWithLrsCallLocked(); } - gpr_log(GPR_INFO, "LB[%p]: shut down", this); + gpr_log(GPR_INFO, "LRS[%p]: shut down", this); } ClientStats* WaitForLoadReport() { @@ -512,7 +512,7 @@ class LrsServiceImpl : public LrsService { bool load_report_ready_ = false; }; -class XdsEnd2endTest : public ::testing::Test { +class XdsEnd2endTest : public ::testing::TestWithParam { protected: XdsEnd2endTest(size_t num_backends, size_t num_balancers, int client_load_reporting_interval_seconds) @@ -573,8 +573,7 @@ class XdsEnd2endTest : public ::testing::Test { void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); } void ResetStub(int fallback_timeout = 0, int failover_timeout = 0, - const grpc::string& expected_targets = "", - grpc::string scheme = "") { + const grpc::string& expected_targets = "") { ChannelArguments args; // TODO(juanlishen): Add setter to ChannelArguments. if (fallback_timeout > 0) { @@ -583,12 +582,21 @@ class XdsEnd2endTest : public ::testing::Test { if (failover_timeout > 0) { args.SetInt(GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, failover_timeout); } + // If the parent channel is using the fake resolver, we inject the + // response generator for the parent here, and then SetNextResolution() + // will inject the xds channel's response generator via the parent's + // reponse generator. + // + // In contrast, if we are using the xds resolver, then the parent + // channel never uses a response generator, and we inject the xds + // channel's response generator here. args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, - response_generator_.get()); + GetParam() ? lb_channel_response_generator_.get() + : response_generator_.get()); if (!expected_targets.empty()) { args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets); } - if (scheme.empty()) scheme = "fake"; + grpc::string scheme = GetParam() ? "xds-experimental" : "fake"; std::ostringstream uri; uri << scheme << ":///" << kApplicationTargetName_; // TODO(dgq): templatize tests to run everything using both secure and @@ -633,8 +641,7 @@ class XdsEnd2endTest : public ::testing::Test { ++*num_total; } - std::tuple WaitForAllBackends(int num_requests_multiple_of = 1, - size_t start_index = 0, + std::tuple WaitForAllBackends(size_t start_index = 0, size_t stop_index = 0) { int num_ok = 0; int num_failure = 0; @@ -643,15 +650,11 @@ class XdsEnd2endTest : public ::testing::Test { while (!SeenAllBackends(start_index, stop_index)) { SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops); } - while (num_total % num_requests_multiple_of != 0) { - SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops); - } ResetBackendCounters(); gpr_log(GPR_INFO, - "Performed %d warm up requests (a multiple of %d) against the " - "backends. %d succeeded, %d failed, %d dropped.", - num_total, num_requests_multiple_of, num_ok, num_failure, - num_drops); + "Performed %d warm up requests against the backends. " + "%d succeeded, %d failed, %d dropped.", + num_total, num_ok, num_failure, num_drops); return std::make_tuple(num_ok, num_failure, num_drops); } @@ -686,6 +689,7 @@ class XdsEnd2endTest : public ::testing::Test { const char* service_config_json = nullptr, grpc_core::FakeResolverResponseGenerator* lb_channel_response_generator = nullptr) { + if (GetParam()) return; // Not used with xds resolver. grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; result.addresses = CreateAddressListFromPortList(ports); @@ -919,22 +923,6 @@ class XdsEnd2endTest : public ::testing::Test { "}"; }; -class XdsResolverTest : public XdsEnd2endTest { - public: - XdsResolverTest() : XdsEnd2endTest(0, 0, 0) {} -}; - -// Tests that if the "xds-experimental" scheme is used, xDS resolver will be -// used. -TEST_F(XdsResolverTest, XdsResolverIsUsed) { - // Use xds-experimental scheme in URI. - ResetStub(0, 0, "", "xds-experimental"); - // Send an RPC to trigger resolution. - auto unused_result = SendRpc(); - // Xds resolver returns xds_experimental as the LB policy. - EXPECT_EQ("xds_experimental", channel_->GetLoadBalancingPolicyName()); -} - class BasicTest : public XdsEnd2endTest { public: BasicTest() : XdsEnd2endTest(4, 1, 0) {} @@ -942,7 +930,7 @@ class BasicTest : public XdsEnd2endTest { // Tests that the balancer sends the correct response to the client, and the // client sends RPCs to the backends using the default child policy. -TEST_F(BasicTest, Vanilla) { +TEST_P(BasicTest, Vanilla) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcsPerAddress = 100; @@ -970,7 +958,7 @@ TEST_F(BasicTest, Vanilla) { // Tests that subchannel sharing works when the same backend is listed multiple // times. -TEST_F(BasicTest, SameBackendListedMultipleTimes) { +TEST_P(BasicTest, SameBackendListedMultipleTimes) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); // Same backend listed twice. @@ -993,7 +981,7 @@ TEST_F(BasicTest, SameBackendListedMultipleTimes) { } // Tests that RPCs will be blocked until a non-empty serverlist is received. -TEST_F(BasicTest, InitiallyEmptyServerlist) { +TEST_P(BasicTest, InitiallyEmptyServerlist) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); @@ -1029,7 +1017,7 @@ TEST_F(BasicTest, InitiallyEmptyServerlist) { // Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if // all the servers are unreachable. -TEST_F(BasicTest, AllServersUnreachableFailFast) { +TEST_P(BasicTest, AllServersUnreachableFailFast) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumUnreachableServers = 5; @@ -1051,7 +1039,7 @@ TEST_F(BasicTest, AllServersUnreachableFailFast) { // Tests that RPCs fail when the backends are down, and will succeed again after // the backends are restarted. -TEST_F(BasicTest, BackendsRestart) { +TEST_P(BasicTest, BackendsRestart) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ @@ -1071,7 +1059,7 @@ TEST_F(BasicTest, BackendsRestart) { using SecureNamingTest = BasicTest; // Tests that secure naming check passes if target name is expected. -TEST_F(SecureNamingTest, TargetNameIsExpected) { +TEST_P(SecureNamingTest, TargetNameIsExpected) { // TODO(juanlishen): Use separate fake creds for the balancer channel. ResetStub(0, 0, kApplicationTargetName_ + ";lb"); SetNextResolution({}, kDefaultServiceConfig_.c_str()); @@ -1098,7 +1086,7 @@ TEST_F(SecureNamingTest, TargetNameIsExpected) { } // Tests that secure naming check fails if target name is unexpected. -TEST_F(SecureNamingTest, TargetNameIsUnexpected) { +TEST_P(SecureNamingTest, TargetNameIsUnexpected) { gpr_setenv("GRPC_XDS_BOOTSTRAP", "test/cpp/end2end/xds_bootstrap_bad.json"); ::testing::FLAGS_gtest_death_test_style = "threadsafe"; // Make sure that we blow up (via abort() from the security connector) when @@ -1117,7 +1105,7 @@ using LocalityMapTest = BasicTest; // Tests that the localities in a locality map are picked according to their // weights. -TEST_F(LocalityMapTest, WeightedRoundRobin) { +TEST_P(LocalityMapTest, WeightedRoundRobin) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; @@ -1135,7 +1123,7 @@ TEST_F(LocalityMapTest, WeightedRoundRobin) { }); ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 0); // Wait for both backends to be ready. - WaitForAllBackends(1, 0, 2); + WaitForAllBackends(0, 2); // Send kNumRpcs RPCs. CheckRpcSendOk(kNumRpcs); // The locality picking rates should be roughly equal to the expectation. @@ -1161,7 +1149,7 @@ TEST_F(LocalityMapTest, WeightedRoundRobin) { // Tests that the locality map can work properly even when it contains a large // number of localities. -TEST_F(LocalityMapTest, StressTest) { +TEST_P(LocalityMapTest, StressTest) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumLocalities = 100; @@ -1196,7 +1184,7 @@ TEST_F(LocalityMapTest, StressTest) { // Tests that the localities in a locality map are picked correctly after update // (addition, modification, deletion). -TEST_F(LocalityMapTest, UpdateMap) { +TEST_P(LocalityMapTest, UpdateMap) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 1000; @@ -1231,7 +1219,7 @@ TEST_F(LocalityMapTest, UpdateMap) { }); ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), 5000); // Wait for the first 3 backends to be ready. - WaitForAllBackends(1, 0, 3); + WaitForAllBackends(0, 3); gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); // Send kNumRpcs RPCs. CheckRpcSendOk(kNumRpcs); @@ -1289,11 +1277,11 @@ TEST_F(LocalityMapTest, UpdateMap) { class FailoverTest : public BasicTest { public: - FailoverTest() { ResetStub(0, 100, "", ""); } + FailoverTest() { ResetStub(0, 100, ""); } }; // Localities with the highest priority are used when multiple priority exist. -TEST_F(FailoverTest, ChooseHighestPriority) { +TEST_P(FailoverTest, ChooseHighestPriority) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ @@ -1314,7 +1302,7 @@ TEST_F(FailoverTest, ChooseHighestPriority) { // If the higher priority localities are not reachable, failover to the highest // priority among the rest. -TEST_F(FailoverTest, Failover) { +TEST_P(FailoverTest, Failover) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ @@ -1338,7 +1326,7 @@ TEST_F(FailoverTest, Failover) { // If a locality with higher priority than the current one becomes ready, // switch to it. -TEST_F(FailoverTest, SwitchBackToHigherPriority) { +TEST_P(FailoverTest, SwitchBackToHigherPriority) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 100; @@ -1367,7 +1355,7 @@ TEST_F(FailoverTest, SwitchBackToHigherPriority) { // The first update only contains unavailable priorities. The second update // contains available priorities. -TEST_F(FailoverTest, UpdateInitialUnavailable) { +TEST_P(FailoverTest, UpdateInitialUnavailable) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ @@ -1402,7 +1390,7 @@ TEST_F(FailoverTest, UpdateInitialUnavailable) { // Tests that after the localities' priorities are updated, we still choose the // highest READY priority with the updated localities. -TEST_F(FailoverTest, UpdatePriority) { +TEST_P(FailoverTest, UpdatePriority) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 100; @@ -1435,7 +1423,7 @@ TEST_F(FailoverTest, UpdatePriority) { using DropTest = BasicTest; // Tests that RPCs are dropped according to the drop config. -TEST_F(DropTest, Vanilla) { +TEST_P(DropTest, Vanilla) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; @@ -1481,7 +1469,7 @@ TEST_F(DropTest, Vanilla) { } // Tests that drop config is converted correctly from per hundred. -TEST_F(DropTest, DropPerHundred) { +TEST_P(DropTest, DropPerHundred) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; @@ -1522,7 +1510,7 @@ TEST_F(DropTest, DropPerHundred) { } // Tests that drop config is converted correctly from per ten thousand. -TEST_F(DropTest, DropPerTenThousand) { +TEST_P(DropTest, DropPerTenThousand) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; @@ -1563,7 +1551,7 @@ TEST_F(DropTest, DropPerTenThousand) { } // Tests that drop is working correctly after update. -TEST_F(DropTest, Update) { +TEST_P(DropTest, Update) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 1000; @@ -1659,7 +1647,7 @@ TEST_F(DropTest, Update) { } // Tests that all the RPCs are dropped if any drop category drops 100%. -TEST_F(DropTest, DropAll) { +TEST_P(DropTest, DropAll) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 1000; @@ -1688,7 +1676,7 @@ using FallbackTest = BasicTest; // Tests that RPCs are handled by the fallback backends before the serverlist is // received, but will be handled by the serverlist after it's received. -TEST_F(FallbackTest, Vanilla) { +TEST_P(FallbackTest, Vanilla) { const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const size_t kNumBackendsInResolution = backends_.size() / 2; @@ -1703,7 +1691,7 @@ TEST_F(FallbackTest, Vanilla) { ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), kServerlistDelayMs); // Wait until all the fallback backends are reachable. - WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */, + WaitForAllBackends(0 /* start_index */, kNumBackendsInResolution /* stop_index */); gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); CheckRpcSendOk(kNumBackendsInResolution); @@ -1718,8 +1706,7 @@ TEST_F(FallbackTest, Vanilla) { } // Wait until the serverlist reception has been processed and all backends // in the serverlist are reachable. - WaitForAllBackends(1 /* num_requests_multiple_of */, - kNumBackendsInResolution /* start_index */); + WaitForAllBackends(kNumBackendsInResolution /* start_index */); gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); CheckRpcSendOk(backends_.size() - kNumBackendsInResolution); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); @@ -1738,7 +1725,7 @@ TEST_F(FallbackTest, Vanilla) { // Tests that RPCs are handled by the updated fallback backends before // serverlist is received, -TEST_F(FallbackTest, Update) { +TEST_P(FallbackTest, Update) { const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const size_t kNumBackendsInResolution = backends_.size() / 3; @@ -1755,7 +1742,7 @@ TEST_F(FallbackTest, Update) { ScheduleResponseForBalancer(0, AdsServiceImpl::BuildResponse(args), kServerlistDelayMs); // Wait until all the fallback backends are reachable. - WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */, + WaitForAllBackends(0 /* start_index */, kNumBackendsInResolution /* stop_index */); gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); CheckRpcSendOk(kNumBackendsInResolution); @@ -1774,8 +1761,7 @@ TEST_F(FallbackTest, Update) { kDefaultServiceConfig_.c_str()); // Wait until the resolution update has been processed and all the new // fallback backends are reachable. - WaitForAllBackends(1 /* num_requests_multiple_of */, - kNumBackendsInResolution /* start_index */, + WaitForAllBackends(kNumBackendsInResolution /* start_index */, kNumBackendsInResolution + kNumBackendsInResolutionUpdate /* stop_index */); gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); @@ -1796,9 +1782,8 @@ TEST_F(FallbackTest, Update) { } // Wait until the serverlist reception has been processed and all backends // in the serverlist are reachable. - WaitForAllBackends(1 /* num_requests_multiple_of */, - kNumBackendsInResolution + - kNumBackendsInResolutionUpdate /* start_index */); + WaitForAllBackends(kNumBackendsInResolution + + kNumBackendsInResolutionUpdate /* start_index */); gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); CheckRpcSendOk(backends_.size() - kNumBackendsInResolution - kNumBackendsInResolutionUpdate); @@ -1819,7 +1804,7 @@ TEST_F(FallbackTest, Update) { } // Tests that fallback will kick in immediately if the balancer channel fails. -TEST_F(FallbackTest, FallbackEarlyWhenBalancerChannelFails) { +TEST_P(FallbackTest, FallbackEarlyWhenBalancerChannelFails) { const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); // Return an unreachable balancer and one fallback backend. @@ -1832,7 +1817,7 @@ TEST_F(FallbackTest, FallbackEarlyWhenBalancerChannelFails) { } // Tests that fallback will kick in immediately if the balancer call fails. -TEST_F(FallbackTest, FallbackEarlyWhenBalancerCallFails) { +TEST_P(FallbackTest, FallbackEarlyWhenBalancerCallFails) { const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); // Return one balancer and one fallback backend. @@ -1848,7 +1833,7 @@ TEST_F(FallbackTest, FallbackEarlyWhenBalancerCallFails) { // Tests that fallback mode is entered if balancer response is received but the // backends can't be reached. -TEST_F(FallbackTest, FallbackIfResponseReceivedButChildNotReady) { +TEST_P(FallbackTest, FallbackIfResponseReceivedButChildNotReady) { const int kFallbackTimeoutMs = 500 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); @@ -1866,7 +1851,7 @@ TEST_F(FallbackTest, FallbackIfResponseReceivedButChildNotReady) { // Tests that fallback mode is exited if the balancer tells the client to drop // all the calls. -TEST_F(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) { +TEST_P(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) { // Return an unreachable balancer and one fallback backend. SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()}); @@ -1890,7 +1875,7 @@ TEST_F(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) { } // Tests that fallback mode is exited if the child policy becomes ready. -TEST_F(FallbackTest, FallbackModeIsExitedAfterChildRready) { +TEST_P(FallbackTest, FallbackModeIsExitedAfterChildRready) { // Return an unreachable balancer and one fallback backend. SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()}); @@ -1929,7 +1914,7 @@ class BalancerUpdateTest : public XdsEnd2endTest { // Tests that the old LB call is still used after the balancer address update as // long as that call is still alive. -TEST_F(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) { +TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ @@ -1982,7 +1967,7 @@ TEST_F(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) { // of LBs as the one in SetUp() in order to verify that the LB channel inside // xds keeps the initial connection (which by definition is also present in the // update). -TEST_F(BalancerUpdateTest, Repeated) { +TEST_P(BalancerUpdateTest, Repeated) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ @@ -2047,7 +2032,7 @@ TEST_F(BalancerUpdateTest, Repeated) { // Tests that if the balancer is down, the RPCs will still be sent to the // backends according to the last balancer response, until a new balancer is // reachable. -TEST_F(BalancerUpdateTest, DeadUpdate) { +TEST_P(BalancerUpdateTest, DeadUpdate) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({balancers_[0]->port()}); AdsServiceImpl::ResponseArgs args({ @@ -2115,9 +2100,9 @@ TEST_F(BalancerUpdateTest, DeadUpdate) { // The re-resolution tests are deferred because they rely on the fallback mode, // which hasn't been supported. -// TODO(juanlishen): Add TEST_F(BalancerUpdateTest, ReresolveDeadBackend). +// TODO(juanlishen): Add TEST_P(BalancerUpdateTest, ReresolveDeadBackend). -// TODO(juanlishen): Add TEST_F(UpdatesWithClientLoadReportingTest, +// TODO(juanlishen): Add TEST_P(UpdatesWithClientLoadReportingTest, // ReresolveDeadBalancer) class ClientLoadReportingTest : public XdsEnd2endTest { @@ -2126,7 +2111,7 @@ class ClientLoadReportingTest : public XdsEnd2endTest { }; // Tests that the load report received at the balancer is correct. -TEST_F(ClientLoadReportingTest, Vanilla) { +TEST_P(ClientLoadReportingTest, Vanilla) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({balancers_[0]->port()}); const size_t kNumRpcsPerAddress = 100; @@ -2167,7 +2152,7 @@ TEST_F(ClientLoadReportingTest, Vanilla) { // Tests that if the balancer restarts, the client load report contains the // stats before and after the restart correctly. -TEST_F(ClientLoadReportingTest, BalancerRestart) { +TEST_P(ClientLoadReportingTest, BalancerRestart) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({balancers_[0]->port()}); const size_t kNumBackendsFirstPass = backends_.size() / 2; @@ -2182,7 +2167,7 @@ TEST_F(ClientLoadReportingTest, BalancerRestart) { int num_failure = 0; int num_drops = 0; std::tie(num_ok, num_failure, num_drops) = - WaitForAllBackends(/* num_requests_multiple_of */ 1, /* start_index */ 0, + WaitForAllBackends(/* start_index */ 0, /* stop_index */ kNumBackendsFirstPass); ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport(); EXPECT_EQ(static_cast(num_ok), @@ -2192,15 +2177,19 @@ TEST_F(ClientLoadReportingTest, BalancerRestart) { EXPECT_EQ(0U, client_stats->total_dropped_requests()); // Shut down the balancer. balancers_[0]->Shutdown(); - // Send 1 more request per backend. This will continue using the - // last serverlist we received from the balancer before it was shut down. + // We should continue using the last EDS response we received from the + // balancer before it was shut down. + // Note: We need to use WaitForAllBackends() here instead of just + // CheckRpcSendOk(kNumBackendsFirstPass), because when the balancer + // shuts down, the XdsClient will generate an error to the + // ServiceConfigWatcher, which will cause the xds resolver to send a + // no-op update to the LB policy. When this update gets down to the + // round_robin child policy for the locality, it will generate a new + // subchannel list, which resets the start index randomly. So we need + // to be a little more permissive here to avoid spurious failures. ResetBackendCounters(); - CheckRpcSendOk(kNumBackendsFirstPass); - int num_started = kNumBackendsFirstPass; - // Each backend should have gotten 1 request. - for (size_t i = 0; i < kNumBackendsFirstPass; ++i) { - EXPECT_EQ(1UL, backends_[i]->backend_service()->request_count()); - } + int num_started = std::get<0>(WaitForAllBackends( + /* start_index */ 0, /* stop_index */ kNumBackendsFirstPass)); // Now restart the balancer, this time pointing to the new backends. balancers_[0]->Start(server_host_); args = AdsServiceImpl::ResponseArgs({ @@ -2210,8 +2199,7 @@ TEST_F(ClientLoadReportingTest, BalancerRestart) { // Wait for queries to start going to one of the new backends. // This tells us that we're now using the new serverlist. std::tie(num_ok, num_failure, num_drops) = - WaitForAllBackends(/* num_requests_multiple_of */ 1, - /* start_index */ kNumBackendsFirstPass); + WaitForAllBackends(/* start_index */ kNumBackendsFirstPass); num_started += num_ok + num_failure + num_drops; // Send one RPC per backend. CheckRpcSendOk(kNumBackendsSecondPass); @@ -2230,7 +2218,7 @@ class ClientLoadReportingWithDropTest : public XdsEnd2endTest { }; // Tests that the drop stats are correctly reported by client load reporting. -TEST_F(ClientLoadReportingWithDropTest, Vanilla) { +TEST_P(ClientLoadReportingWithDropTest, Vanilla) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 3000; @@ -2293,6 +2281,29 @@ TEST_F(ClientLoadReportingWithDropTest, Vanilla) { EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count()); } +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, BasicTest, ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, SecureNamingTest, ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, LocalityMapTest, ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, FailoverTest, ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, DropTest, ::testing::Bool()); + +// Fallback does not work with xds resolver. +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, FallbackTest, + ::testing::Values(false)); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, BalancerUpdateTest, + ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, ClientLoadReportingTest, + ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, ClientLoadReportingWithDropTest, + ::testing::Bool()); + } // namespace } // namespace testing } // namespace grpc