diff --git a/BUILD b/BUILD index 5d360a2dfc0..fa48d195455 100644 --- a/BUILD +++ b/BUILD @@ -3748,9 +3748,9 @@ grpc_cc_library( hdrs = ["//src/core:ext/filters/client_channel/resolver/fake/fake_resolver.h"], external_deps = [ "absl/base:core_headers", - "absl/status", - "absl/status:statusor", "absl/strings", + "absl/time", + "absl/types:optional", ], language = "c++", visibility = [ @@ -3760,7 +3760,6 @@ grpc_cc_library( deps = [ "config", "debug_location", - "endpoint_addresses", "gpr", "grpc_public_hdrs", "grpc_resolver", @@ -3769,7 +3768,6 @@ grpc_cc_library( "uri_parser", "work_serializer", "//src/core:channel_args", - "//src/core:grpc_service_config", "//src/core:notification", "//src/core:ref_counted", "//src/core:useful", diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc index f82ac6f53d1..598df7ec1b0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc +++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc @@ -98,7 +98,7 @@ class ChildPolicyHandler::Helper : parent()->child_policy_.get(); if (child_ != latest_child_policy) return; if (GRPC_TRACE_FLAG_ENABLED(*(parent()->tracer_))) { - gpr_log(GPR_INFO, "[child_policy_handler %p] started name re-resolving", + gpr_log(GPR_INFO, "[child_policy_handler %p] requesting re-resolution", parent()); } parent()->channel_control_helper()->RequestReresolution(); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 85f04a3b4a4..4d3ebc1455d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -841,14 +841,10 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, void GrpcLb::Helper::RequestReresolution() { if (parent()->shutting_down_) return; - // If we are talking to a balancer, we expect to get updated addresses - // from the balancer, so we can ignore the re-resolution request from - // the child policy. Otherwise, pass the re-resolution request up to the - // channel. - if (parent()->lb_calld_ == nullptr || - !parent()->lb_calld_->seen_initial_response()) { - parent()->channel_control_helper()->RequestReresolution(); - } + // Ignore if we're not in fallback mode, because if we got the backend + // addresses from the balancer, re-resolving is not going to fix it. + if (!parent()->fallback_mode_) return; + parent()->channel_control_helper()->RequestReresolution(); } // @@ -1508,6 +1504,9 @@ void GrpcLb::ResetBackoffLocked() { } absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "[grpclb %p] received update", this); + } const bool is_initial_update = lb_channel_ == nullptr; config_ = args.config; GPR_ASSERT(config_ != nullptr); @@ -1516,11 +1515,15 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { fallback_backend_addresses_ = std::move(args.addresses); if (fallback_backend_addresses_.ok()) { // Add null LB token attributes. - for (EndpointAddresses& addresses : *fallback_backend_addresses_) { - addresses = EndpointAddresses( - addresses.addresses(), - addresses.args().SetObject( + for (EndpointAddresses& endpoint : *fallback_backend_addresses_) { + endpoint = EndpointAddresses( + endpoint.addresses(), + endpoint.args().SetObject( MakeRefCounted("", nullptr))); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "[grpclb %p] fallback address: %s", this, + endpoint.ToString().c_str()); + } } } resolution_note_ = std::move(args.resolution_note); @@ -1569,6 +1572,12 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { absl::Status GrpcLb::UpdateBalancerChannelLocked() { // Get balancer addresses. EndpointAddressesList balancer_addresses = ExtractBalancerAddresses(args_); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { + for (const auto& endpoint : balancer_addresses) { + gpr_log(GPR_INFO, "[grpclb %p] balancer address: %s", this, + endpoint.ToString().c_str()); + } + } absl::Status status; if (balancer_addresses.empty()) { status = absl::UnavailableError("balancer address list must be non-empty"); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 193e3c2b422..91b0d9e7215 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -22,10 +22,9 @@ #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include +#include #include -#include "absl/status/status.h" -#include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include @@ -36,9 +35,7 @@ #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/work_serializer.h" -#include "src/core/lib/resolver/endpoint_addresses.h" #include "src/core/lib/resolver/resolver_factory.h" -#include "src/core/lib/service_config/service_config.h" #include "src/core/lib/uri/uri_parser.h" namespace grpc_core { @@ -55,47 +52,38 @@ class FakeResolver : public Resolver { private: friend class FakeResolverResponseGenerator; - friend class FakeResolverResponseSetter; void ShutdownLocked() override; void MaybeSendResultLocked(); - void ReturnReresolutionResult(); - // passed-in parameters - ChannelArgs channel_args_; std::shared_ptr work_serializer_; std::unique_ptr result_handler_; + ChannelArgs channel_args_; RefCountedPtr response_generator_; - // If has_next_result_ is true, next_result_ is the next resolution result - // to be returned. - bool has_next_result_ = false; - Result next_result_; - // Result to use for the pretended re-resolution in - // RequestReresolutionLocked(). - bool has_reresolution_result_ = false; - Result reresolution_result_; + // The next resolution result to be returned, if any. Present when we + // get a result before the resolver is started. + absl::optional next_result_; // True after the call to StartLocked(). bool started_ = false; // True after the call to ShutdownLocked(). bool shutdown_ = false; - // if true, return failure - bool return_failure_ = false; - // pending re-resolution - bool reresolution_closure_pending_ = false; }; FakeResolver::FakeResolver(ResolverArgs args) : work_serializer_(std::move(args.work_serializer)), result_handler_(std::move(args.result_handler)), + channel_args_( + // Channels sharing the same subchannels may have different resolver + // response generators. If we don't remove this arg, subchannel pool + // will create new subchannels for the same address instead of + // reusing existing ones because of different values of this channel + // arg. Can't just use GRPC_ARG_NO_SUBCHANNEL_PREFIX, since + // that can't be passed into the channel from test code. + args.args.Remove(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR)), response_generator_( args.args.GetObjectRef()) { - // Channels sharing the same subchannels may have different resolver response - // generators. If we don't remove this arg, subchannel pool will create new - // subchannels for the same address instead of reusing existing ones because - // of different values of this channel arg. - channel_args_ = args.args.Remove(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR); if (response_generator_ != nullptr) { response_generator_->SetFakeResolver(Ref()); } @@ -107,19 +95,9 @@ void FakeResolver::StartLocked() { } void FakeResolver::RequestReresolutionLocked() { - if (has_reresolution_result_ || return_failure_) { - next_result_ = reresolution_result_; - has_next_result_ = true; - // Return the result in a different closure, so that we don't call - // back into the LB policy while it's still processing the previous - // update. - if (!reresolution_closure_pending_) { - reresolution_closure_pending_ = true; - Ref().release(); // ref held by closure - work_serializer_->Run([this]() { ReturnReresolutionResult(); }, - DEBUG_LOCATION); - } - } + // Re-resolution can't happen until after we return an initial result. + GPR_ASSERT(response_generator_ != nullptr); + response_generator_->ReresolutionRequested(); } void FakeResolver::ShutdownLocked() { @@ -132,80 +110,15 @@ void FakeResolver::ShutdownLocked() { void FakeResolver::MaybeSendResultLocked() { if (!started_ || shutdown_) return; - if (return_failure_) { - // TODO(roth): Change resolver result generator to be able to inject - // the error to be returned and to be able to independently set errors - // for addresses and service config. - Result result; - result.addresses = absl::UnavailableError("Resolver transient failure"); - result.service_config = result.addresses.status(); - result.args = channel_args_; - result_handler_->ReportResult(std::move(result)); - return_failure_ = false; - } else if (has_next_result_) { + if (next_result_.has_value()) { // When both next_results_ and channel_args_ contain an arg with the same - // name, only the one in next_results_. - next_result_.args = next_result_.args.UnionWith(channel_args_); - result_handler_->ReportResult(std::move(next_result_)); - has_next_result_ = false; + // name, use the one in next_results_. + next_result_->args = next_result_->args.UnionWith(channel_args_); + result_handler_->ReportResult(std::move(*next_result_)); + next_result_.reset(); } } -void FakeResolver::ReturnReresolutionResult() { - reresolution_closure_pending_ = false; - MaybeSendResultLocked(); - Unref(); -} - -class FakeResolverResponseSetter { - public: - explicit FakeResolverResponseSetter(RefCountedPtr resolver, - Resolver::Result result, - bool has_result = false, - bool immediate = true) - : resolver_(std::move(resolver)), - result_(std::move(result)), - has_result_(has_result), - immediate_(immediate) {} - void SetResponseLocked(); - void SetReresolutionResponseLocked(); - void SetFailureLocked(); - - private: - RefCountedPtr resolver_; - Resolver::Result result_; - bool has_result_; - bool immediate_; -}; - -// Deletes object when done -void FakeResolverResponseSetter::SetReresolutionResponseLocked() { - if (!resolver_->shutdown_) { - resolver_->reresolution_result_ = std::move(result_); - resolver_->has_reresolution_result_ = has_result_; - } - delete this; -} - -// Deletes object when done -void FakeResolverResponseSetter::SetResponseLocked() { - if (!resolver_->shutdown_) { - resolver_->next_result_ = std::move(result_); - resolver_->has_next_result_ = true; - resolver_->MaybeSendResultLocked(); - } - delete this; -} - -// Deletes object when done -void FakeResolverResponseSetter::SetFailureLocked() { - if (!resolver_->shutdown_) { - resolver_->return_failure_ = true; - if (immediate_) resolver_->MaybeSendResultLocked(); - } - delete this; -} - // // FakeResolverResponseGenerator // @@ -220,101 +133,73 @@ void FakeResolverResponseGenerator::SetResponseAndNotify( { MutexLock lock(&mu_); if (resolver_ == nullptr) { - has_result_ = true; result_ = std::move(result); if (notify_when_set != nullptr) notify_when_set->Notify(); return; } resolver = resolver_->Ref(); } - FakeResolverResponseSetter* arg = - new FakeResolverResponseSetter(resolver, std::move(result)); - resolver->work_serializer_->Run( - [arg, notify_when_set]() { - arg->SetResponseLocked(); - if (notify_when_set != nullptr) notify_when_set->Notify(); - }, - DEBUG_LOCATION); + SendResultToResolver(std::move(resolver), std::move(result), notify_when_set); } -void FakeResolverResponseGenerator::SetReresolutionResponseAndNotify( - Resolver::Result result, Notification* notify_when_set) { - RefCountedPtr resolver; +void FakeResolverResponseGenerator::SetFakeResolver( + RefCountedPtr resolver) { + Resolver::Result result; { MutexLock lock(&mu_); - GPR_ASSERT(resolver_ != nullptr); - resolver = resolver_->Ref(); + resolver_ = resolver; + if (resolver_set_cv_ != nullptr) resolver_set_cv_->SignalAll(); + if (resolver == nullptr) return; + if (!result_.has_value()) return; + result = std::move(*result_); + result_.reset(); } - FakeResolverResponseSetter* arg = new FakeResolverResponseSetter( - resolver, std::move(result), true /* has_result */); - resolver->work_serializer_->Run( - [arg, notify_when_set]() { - arg->SetReresolutionResponseLocked(); + SendResultToResolver(std::move(resolver), std::move(result), nullptr); +} + +void FakeResolverResponseGenerator::SendResultToResolver( + RefCountedPtr resolver, Resolver::Result result, + Notification* notify_when_set) { + auto* resolver_ptr = resolver.get(); + resolver_ptr->work_serializer_->Run( + [resolver = std::move(resolver), result = std::move(result), + notify_when_set]() mutable { + if (!resolver->shutdown_) { + resolver->next_result_ = std::move(result); + resolver->MaybeSendResultLocked(); + } if (notify_when_set != nullptr) notify_when_set->Notify(); }, DEBUG_LOCATION); } -void FakeResolverResponseGenerator::UnsetReresolutionResponse() { - RefCountedPtr resolver; - { - MutexLock lock(&mu_); - GPR_ASSERT(resolver_ != nullptr); - resolver = resolver_->Ref(); - } - FakeResolverResponseSetter* arg = - new FakeResolverResponseSetter(resolver, Resolver::Result()); - resolver->work_serializer_->Run( - [arg]() { arg->SetReresolutionResponseLocked(); }, DEBUG_LOCATION); -} - -void FakeResolverResponseGenerator::SetFailure() { - RefCountedPtr resolver; - { - MutexLock lock(&mu_); - GPR_ASSERT(resolver_ != nullptr); - resolver = resolver_->Ref(); - } - FakeResolverResponseSetter* arg = - new FakeResolverResponseSetter(resolver, Resolver::Result()); - resolver->work_serializer_->Run([arg]() { arg->SetFailureLocked(); }, - DEBUG_LOCATION); -} - -void FakeResolverResponseGenerator::SetFailureOnReresolution() { - RefCountedPtr resolver; - { - MutexLock lock(&mu_); - GPR_ASSERT(resolver_ != nullptr); - resolver = resolver_->Ref(); +bool FakeResolverResponseGenerator::WaitForResolverSet(absl::Duration timeout) { + MutexLock lock(&mu_); + if (resolver_ == nullptr) { + CondVar condition; + resolver_set_cv_ = &condition; + condition.WaitWithTimeout(&mu_, timeout); + resolver_set_cv_ = nullptr; } - FakeResolverResponseSetter* arg = new FakeResolverResponseSetter( - resolver, Resolver::Result(), false /* has_result */, - false /* immediate */); - resolver->work_serializer_->Run([arg]() { arg->SetFailureLocked(); }, - DEBUG_LOCATION); + return resolver_ != nullptr; } -void FakeResolverResponseGenerator::SetFakeResolver( - RefCountedPtr resolver) { - MutexLock lock(&mu_); - resolver_ = std::move(resolver); - cv_.SignalAll(); - if (resolver_ == nullptr) return; - if (has_result_) { - FakeResolverResponseSetter* arg = - new FakeResolverResponseSetter(resolver_, std::move(result_)); - resolver_->work_serializer_->Run([arg]() { arg->SetResponseLocked(); }, - DEBUG_LOCATION); - has_result_ = false; +bool FakeResolverResponseGenerator::WaitForReresolutionRequest( + absl::Duration timeout) { + MutexLock lock(&reresolution_mu_); + if (!reresolution_requested_) { + CondVar condition; + reresolution_cv_ = &condition; + condition.WaitWithTimeout(&reresolution_mu_, timeout); + reresolution_cv_ = nullptr; } + return std::exchange(reresolution_requested_, false); } -void FakeResolverResponseGenerator::WaitForResolverSet() { - MutexLock lock(&mu_); - while (resolver_ == nullptr) { - cv_.Wait(&mu_); - } +void FakeResolverResponseGenerator::ReresolutionRequested() { + MutexLock lock(&reresolution_mu_); + reresolution_requested_ = true; + if (reresolution_cv_ != nullptr) reresolution_cv_->SignalAll(); } namespace { @@ -341,22 +226,6 @@ const grpc_arg_pointer_vtable ResponseGeneratorChannelArgCopy, ResponseGeneratorChannelArgDestroy, ResponseGeneratorChannelArgCmp}; -grpc_arg FakeResolverResponseGenerator::MakeChannelArg( - FakeResolverResponseGenerator* generator) { - return grpc_channel_arg_pointer_create( - const_cast(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR), generator, - &kChannelArgPointerVtable); -} - -RefCountedPtr -FakeResolverResponseGenerator::GetFromArgs(const grpc_channel_args* args) { - auto* response_generator = - grpc_channel_args_find_pointer( - args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR); - if (response_generator == nullptr) return nullptr; - return response_generator->Ref(); -} - // // Factory // diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index b0463eee9d6..0a39dd42354 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -23,6 +23,8 @@ #include "absl/base/thread_annotations.h" #include "absl/strings/string_view.h" +#include "absl/time/time.h" +#include "absl/types/optional.h" #include @@ -42,8 +44,7 @@ class FakeResolver; /// A mechanism for generating responses for the fake resolver. /// An instance of this class is passed to the fake resolver via a channel -/// argument (see \a MakeChannelArg()) and used to inject and trigger custom -/// resolutions. +/// argument and used to inject and trigger custom resolutions. // TODO(roth): I would ideally like this to be InternallyRefCounted // instead of RefCounted, but external refs are currently needed to // encode this in channel args. Once channel_args are converted to C++, @@ -77,50 +78,20 @@ class FakeResolverResponseGenerator n.WaitForNotification(); } - // Sets the re-resolution response, which is returned by the fake resolver - // when re-resolution is requested (via \a RequestReresolutionLocked()). - // The new re-resolution response replaces any previous re-resolution - // response that may have been set by a previous call. - // notify_when_set is an optional notification to signal when the response has - // been set. - void SetReresolutionResponseAndNotify(Resolver::Result result, - Notification* notify_when_set); - void SetReresolutionResponseAsync(Resolver::Result result) { - SetReresolutionResponseAndNotify(std::move(result), nullptr); - } - void SetReresolutionResponseSynchronously(Resolver::Result result) { - Notification n; - SetReresolutionResponseAndNotify(std::move(result), &n); - n.WaitForNotification(); - } - - // Unsets the re-resolution response. After this, the fake resolver will - // not return anything when \a RequestReresolutionLocked() is called. - void UnsetReresolutionResponse(); - - // Tells the resolver to return a transient failure. - void SetFailure(); - - // Same as SetFailure(), but instead of returning the error - // immediately, waits for the next call to RequestReresolutionLocked(). - void SetFailureOnReresolution(); - - // Returns a channel arg containing \a generator. - // TODO(roth): When we have time, make this a non-static method. - static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator); + // Waits up to timeout for a re-resolution request. Returns true if a + // re-resolution request is seen, or false if timeout occurs. Returns + // true immediately if there was a re-resolution request since the + // last time this method was called. + bool WaitForReresolutionRequest(absl::Duration timeout); - // Returns the response generator in \a args, or null if not found. - static RefCountedPtr GetFromArgs( - const grpc_channel_args* args); + // Wait for a resolver to be set (setting may be happening asynchronously, so + // this may block - consider it test only). + bool WaitForResolverSet(absl::Duration timeout); static absl::string_view ChannelArgName() { return GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR; } - // Wait for a resolver to be set (setting may be happening asynchronously, so - // this may block - consider it test only). - void WaitForResolverSet(); - static int ChannelArgsCompare(const FakeResolverResponseGenerator* a, const FakeResolverResponseGenerator* b) { return QsortCompare(a, b); @@ -128,15 +99,29 @@ class FakeResolverResponseGenerator private: friend class FakeResolver; + // Set the corresponding FakeResolver to this generator. void SetFakeResolver(RefCountedPtr resolver); + // Called by FakeResolver when re-resolution is requested. + void ReresolutionRequested(); + + // Helper function to send a result to the resolver. + static void SendResultToResolver(RefCountedPtr resolver, + Resolver::Result result, + Notification* notify_when_set); + // Mutex protecting the members below. Mutex mu_; - CondVar cv_; + CondVar* resolver_set_cv_ ABSL_GUARDED_BY(mu_) = nullptr; RefCountedPtr resolver_ ABSL_GUARDED_BY(mu_); - Resolver::Result result_ ABSL_GUARDED_BY(mu_); - bool has_result_ ABSL_GUARDED_BY(mu_) = false; + // Temporarily stores the result when it gets set before the response + // generator is seen by the FakeResolver. + absl::optional result_ ABSL_GUARDED_BY(mu_); + + Mutex reresolution_mu_; + CondVar* reresolution_cv_ ABSL_GUARDED_BY(reresolution_mu_) = nullptr; + bool reresolution_requested_ ABSL_GUARDED_BY(reresolution_mu_) = false; }; } // namespace grpc_core diff --git a/test/core/client_channel/resolvers/fake_resolver_test.cc b/test/core/client_channel/resolvers/fake_resolver_test.cc index a02bfa24418..97f6ec587e1 100644 --- a/test/core/client_channel/resolvers/fake_resolver_test.cc +++ b/test/core/client_channel/resolvers/fake_resolver_test.cc @@ -32,11 +32,10 @@ #include "absl/container/inlined_vector.h" #include "absl/status/statusor.h" #include "absl/strings/str_format.h" +#include "absl/synchronization/notification.h" #include "gtest/gtest.h" #include -#include -#include #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/channel/channel_args.h" @@ -54,180 +53,175 @@ #include "src/core/lib/uri/uri_parser.h" #include "test/core/util/test_config.h" -class ResultHandler : public grpc_core::Resolver::ResultHandler { - public: - void SetExpectedAndEvent(grpc_core::Resolver::Result expected, - gpr_event* ev) { - grpc_core::MutexLock lock(&mu_); - ASSERT_EQ(ev_, nullptr); - expected_ = std::move(expected); - ev_ = ev; - } +namespace grpc_core { +namespace testing { - void ReportResult(grpc_core::Resolver::Result actual) override { - grpc_core::MutexLock lock(&mu_); - ASSERT_NE(ev_, nullptr); - // We only check the addresses, because that's the only thing - // explicitly set by the test via - // FakeResolverResponseGenerator::SetResponse(). - ASSERT_TRUE(actual.addresses.ok()); - ASSERT_EQ(actual.addresses->size(), expected_.addresses->size()); - for (size_t i = 0; i < expected_.addresses->size(); ++i) { - ASSERT_EQ((*actual.addresses)[i], (*expected_.addresses)[i]); +class FakeResolverTest : public ::testing::Test { + protected: + class ResultHandler : public Resolver::ResultHandler { + public: + void SetExpectedAndNotification(Resolver::Result expected, + absl::Notification* notification) { + MutexLock lock(&mu_); + ASSERT_EQ(notification_, nullptr); + expected_ = std::move(expected); + notification_ = notification; } - gpr_event_set(ev_, reinterpret_cast(1)); - ev_ = nullptr; - } - private: - grpc_core::Mutex mu_; - grpc_core::Resolver::Result expected_ ABSL_GUARDED_BY(mu_); - gpr_event* ev_ ABSL_GUARDED_BY(mu_) = nullptr; -}; + void ReportResult(Resolver::Result actual) override { + MutexLock lock(&mu_); + ASSERT_NE(notification_, nullptr); + // TODO(roth): Check fields other than just the addresses. + // Note: No good way to compare result_health_callback. + ASSERT_TRUE(actual.addresses.ok()); + ASSERT_EQ(actual.addresses->size(), expected_.addresses->size()); + for (size_t i = 0; i < expected_.addresses->size(); ++i) { + ASSERT_EQ((*actual.addresses)[i], (*expected_.addresses)[i]); + } + notification_->Notify(); + notification_ = nullptr; + } -static grpc_core::OrphanablePtr build_fake_resolver( - std::shared_ptr work_serializer, - grpc_core::FakeResolverResponseGenerator* response_generator, - std::unique_ptr result_handler) { - grpc_core::ResolverFactory* factory = grpc_core::CoreConfiguration::Get() - .resolver_registry() - .LookupResolverFactory("fake"); - grpc_arg generator_arg = - grpc_core::FakeResolverResponseGenerator::MakeChannelArg( - response_generator); - grpc_channel_args channel_args = {1, &generator_arg}; - grpc_core::ResolverArgs args; - args.args = grpc_core::ChannelArgs::FromC(&channel_args); - args.work_serializer = std::move(work_serializer); - args.result_handler = std::move(result_handler); - grpc_core::OrphanablePtr resolver = - factory->CreateResolver(std::move(args)); - return resolver; -} + private: + Mutex mu_; + Resolver::Result expected_ ABSL_GUARDED_BY(mu_); + absl::Notification* notification_ ABSL_GUARDED_BY(mu_) = nullptr; + }; -// Create a new resolution containing 2 addresses. -static grpc_core::Resolver::Result create_new_resolver_result() { - static size_t test_counter = 0; - const size_t num_addresses = 2; - // Create address list. - grpc_core::EndpointAddressesList addresses; - for (size_t i = 0; i < num_addresses; ++i) { - std::string uri_string = absl::StrFormat("ipv4:127.0.0.1:100%" PRIuPTR, - test_counter * num_addresses + i); - absl::StatusOr uri = grpc_core::URI::Parse(uri_string); - EXPECT_TRUE(uri.ok()); - grpc_resolved_address address; - EXPECT_TRUE(grpc_parse_uri(*uri, &address)); - absl::InlinedVector args_to_add; - addresses.emplace_back(address, grpc_core::ChannelArgs()); + static OrphanablePtr BuildFakeResolver( + std::shared_ptr work_serializer, + RefCountedPtr response_generator, + std::unique_ptr result_handler) { + ResolverFactory* factory = + CoreConfiguration::Get().resolver_registry().LookupResolverFactory( + "fake"); + ResolverArgs args; + args.args = ChannelArgs().SetObject(std::move(response_generator)); + args.work_serializer = std::move(work_serializer); + args.result_handler = std::move(result_handler); + return factory->CreateResolver(std::move(args)); } - ++test_counter; - grpc_core::Resolver::Result result; - result.addresses = std::move(addresses); - return result; -} -TEST(FakeResolverTest, FakeResolver) { - grpc_core::ExecCtx exec_ctx; - std::shared_ptr work_serializer = - std::make_shared( - grpc_event_engine::experimental::GetDefaultEventEngine()); - auto synchronously = [work_serializer](std::function do_this_thing) { - grpc_core::Notification notification; - work_serializer->Run( - [do_this_thing = std::move(do_this_thing), ¬ification]() mutable { - do_this_thing(); + // Create a new resolution containing 2 addresses. + static Resolver::Result CreateResolverResult() { + static size_t test_counter = 0; + const size_t num_addresses = 2; + // Create address list. + EndpointAddressesList addresses; + for (size_t i = 0; i < num_addresses; ++i) { + std::string uri_string = absl::StrFormat( + "ipv4:127.0.0.1:100%" PRIuPTR, test_counter * num_addresses + i); + absl::StatusOr uri = URI::Parse(uri_string); + EXPECT_TRUE(uri.ok()); + grpc_resolved_address address; + EXPECT_TRUE(grpc_parse_uri(*uri, &address)); + absl::InlinedVector args_to_add; + addresses.emplace_back(address, ChannelArgs()); + } + ++test_counter; + Resolver::Result result; + result.addresses = std::move(addresses); + return result; + } + + OrphanablePtr CreateResolver() { + result_handler_ = new ResultHandler(); + return BuildFakeResolver( + work_serializer_, response_generator_, + std::unique_ptr(result_handler_)); + } + + void RunSynchronously(std::function callback) { + Notification notification; + work_serializer_->Run( + [callback = std::move(callback), ¬ification]() { + callback(); notification.Notify(); }, DEBUG_LOCATION); notification.WaitForNotification(); - }; + } + + ExecCtx exec_ctx_; + std::shared_ptr work_serializer_ = + std::make_shared( + grpc_event_engine::experimental::GetDefaultEventEngine()); + RefCountedPtr response_generator_ = + MakeRefCounted(); + ResultHandler* result_handler_ = nullptr; +}; + +TEST_F(FakeResolverTest, WaitForResolverSet) { + EXPECT_FALSE(response_generator_->WaitForResolverSet(absl::Milliseconds(1))); + auto resolver = CreateResolver(); + ASSERT_NE(resolver, nullptr); + EXPECT_TRUE(response_generator_->WaitForResolverSet(absl::Milliseconds(1))); +} + +TEST_F(FakeResolverTest, ReturnResultBeforeResolverCreated) { + // Return result via response generator. + Resolver::Result result = CreateResolverResult(); + response_generator_->SetResponseAsync(result); + // Create and start resolver. + auto resolver = CreateResolver(); + ASSERT_NE(resolver, nullptr); + absl::Notification notification; + result_handler_->SetExpectedAndNotification(std::move(result), ¬ification); + RunSynchronously([resolver = resolver.get()] { resolver->StartLocked(); }); + // Expect result. + ASSERT_TRUE(notification.WaitForNotificationWithTimeout( + absl::Seconds(5 * grpc_test_slowdown_factor()))); +} + +TEST_F(FakeResolverTest, ReturnResultBeforeResolverStarted) { // Create resolver. - ResultHandler* result_handler = new ResultHandler(); - grpc_core::RefCountedPtr - response_generator = - grpc_core::MakeRefCounted(); - grpc_core::OrphanablePtr resolver = build_fake_resolver( - work_serializer, response_generator.get(), - std::unique_ptr(result_handler)); - ASSERT_NE(resolver.get(), nullptr); - synchronously([resolver = resolver.get()] { resolver->StartLocked(); }); - // Test 1: normal resolution. - // next_results != NULL, reresolution_results == NULL. - // Expected response is next_results. - gpr_log(GPR_INFO, "TEST 1"); - grpc_core::Resolver::Result result = create_new_resolver_result(); - gpr_event ev1; - gpr_event_init(&ev1); - result_handler->SetExpectedAndEvent(result, &ev1); - response_generator->SetResponseSynchronously(std::move(result)); - grpc_core::ExecCtx::Get()->Flush(); - ASSERT_NE(gpr_event_wait(&ev1, grpc_timeout_seconds_to_deadline(5)), nullptr); - // Test 2: update resolution. - // next_results != NULL, reresolution_results == NULL. - // Expected response is next_results. - gpr_log(GPR_INFO, "TEST 2"); - result = create_new_resolver_result(); - gpr_event ev2; - gpr_event_init(&ev2); - result_handler->SetExpectedAndEvent(result, &ev2); - response_generator->SetResponseSynchronously(std::move(result)); - grpc_core::ExecCtx::Get()->Flush(); - ASSERT_NE(gpr_event_wait(&ev2, grpc_timeout_seconds_to_deadline(5)), nullptr); - // Test 3: normal re-resolution. - // next_results == NULL, reresolution_results != NULL. - // Expected response is reresolution_results. - gpr_log(GPR_INFO, "TEST 3"); - grpc_core::Resolver::Result reresolution_result = - create_new_resolver_result(); - gpr_event ev3; - gpr_event_init(&ev3); - result_handler->SetExpectedAndEvent(reresolution_result, &ev3); - // Set reresolution_results. - // No result will be returned until re-resolution is requested. - response_generator->SetReresolutionResponseSynchronously(reresolution_result); - grpc_core::ExecCtx::Get()->Flush(); - // Trigger a re-resolution. - synchronously( - [resolver = resolver.get()] { resolver->RequestReresolutionLocked(); }); - grpc_core::ExecCtx::Get()->Flush(); - ASSERT_NE(gpr_event_wait(&ev3, grpc_timeout_seconds_to_deadline(5)), nullptr); - // Test 4: repeat re-resolution. - // next_results == NULL, reresolution_results != NULL. - // Expected response is reresolution_results. - gpr_log(GPR_INFO, "TEST 4"); - gpr_event ev4; - gpr_event_init(&ev4); - result_handler->SetExpectedAndEvent(std::move(reresolution_result), &ev4); - // Trigger a re-resolution. - synchronously( + auto resolver = CreateResolver(); + ASSERT_NE(resolver, nullptr); + Resolver::Result result = CreateResolverResult(); + absl::Notification notification; + result_handler_->SetExpectedAndNotification(result, ¬ification); + // Return result via response generator. + response_generator_->SetResponseAsync(std::move(result)); + // Start resolver. + RunSynchronously([resolver = resolver.get()] { resolver->StartLocked(); }); + // Expect result. + ASSERT_TRUE(notification.WaitForNotificationWithTimeout( + absl::Seconds(5 * grpc_test_slowdown_factor()))); +} + +TEST_F(FakeResolverTest, ReturnResult) { + // Create and start resolver. + auto resolver = CreateResolver(); + ASSERT_NE(resolver, nullptr); + RunSynchronously([resolver = resolver.get()] { resolver->StartLocked(); }); + Resolver::Result result = CreateResolverResult(); + absl::Notification notification; + result_handler_->SetExpectedAndNotification(result, ¬ification); + // Return result via response generator. + response_generator_->SetResponseAsync(std::move(result)); + // Expect result. + ASSERT_TRUE(notification.WaitForNotificationWithTimeout( + absl::Seconds(5 * grpc_test_slowdown_factor()))); +} + +TEST_F(FakeResolverTest, WaitForReresolutionRequest) { + // Create and start resolver. + auto resolver = CreateResolver(); + ASSERT_NE(resolver, nullptr); + RunSynchronously([resolver = resolver.get()] { resolver->StartLocked(); }); + // No re-resolution requested yet. + EXPECT_FALSE( + response_generator_->WaitForReresolutionRequest(absl::Milliseconds(1))); + // Request re-resolution, then try again. + RunSynchronously( [resolver = resolver.get()] { resolver->RequestReresolutionLocked(); }); - grpc_core::ExecCtx::Get()->Flush(); - ASSERT_NE(gpr_event_wait(&ev4, grpc_timeout_seconds_to_deadline(5)), nullptr); - // Test 5: normal resolution. - // next_results != NULL, reresolution_results != NULL. - // Expected response is next_results. - gpr_log(GPR_INFO, "TEST 5"); - result = create_new_resolver_result(); - gpr_event ev5; - gpr_event_init(&ev5); - result_handler->SetExpectedAndEvent(result, &ev5); - response_generator->SetResponseSynchronously(std::move(result)); - grpc_core::ExecCtx::Get()->Flush(); - ASSERT_NE(gpr_event_wait(&ev5, grpc_timeout_seconds_to_deadline(5)), nullptr); - // Test 6: no-op. - // Requesting a new resolution without setting the response shouldn't trigger - // the resolution callback. - gpr_log(GPR_INFO, "TEST 6"); - gpr_event ev6; - gpr_event_init(&ev6); - result_handler->SetExpectedAndEvent(grpc_core::Resolver::Result(), &ev6); - ASSERT_EQ(gpr_event_wait(&ev6, grpc_timeout_milliseconds_to_deadline(100)), - nullptr); - // Clean up. - resolver.reset(); + EXPECT_TRUE( + response_generator_->WaitForReresolutionRequest(absl::Milliseconds(1))); } +} // namespace testing +} // namespace grpc_core + int main(int argc, char** argv) { grpc::testing::TestEnvironment env(&argc, argv); ::testing::InitGoogleTest(&argc, argv); diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD index 0e53196d725..46c9aa3d5b9 100644 --- a/test/core/end2end/BUILD +++ b/test/core/end2end/BUILD @@ -575,14 +575,23 @@ grpc_cc_test( grpc_cc_test( name = "no_server_test", srcs = ["no_server_test.cc"], + external_deps = [ + "absl/status", + "absl/status:statusor", + "absl/time", + ], language = "C++", deps = [ "cq_verifier", + "//:endpoint_addresses", "//:exec_ctx", "//:gpr", "//:grpc_public_hdrs", + "//:grpc_resolver", "//:grpc_resolver_fake", "//:ref_counted_ptr", + "//src/core:channel_args", + "//src/core:grpc_service_config", "//test/core/util:grpc_test_util", ], ) diff --git a/test/core/end2end/no_server_test.cc b/test/core/end2end/no_server_test.cc index 8172ae83bd8..59e25efc9ab 100644 --- a/test/core/end2end/no_server_test.cc +++ b/test/core/end2end/no_server_test.cc @@ -18,6 +18,12 @@ #include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/time/time.h" + #include #include #include @@ -27,8 +33,12 @@ #include #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/resolver/endpoint_addresses.h" +#include "src/core/lib/resolver/resolver.h" +#include "src/core/lib/service_config/service_config.h" #include "test/core/end2end/cq_verifier.h" #include "test/core/util/test_config.h" @@ -43,13 +53,12 @@ void run_test(bool wait_for_ready) { grpc_core::RefCountedPtr response_generator = grpc_core::MakeRefCounted(); - grpc_arg arg = grpc_core::FakeResolverResponseGenerator::MakeChannelArg( - response_generator.get()); - grpc_channel_args args = {1, &arg}; + auto args = grpc_core::ChannelArgs().SetObject(response_generator).ToC(); // create a call, channel to a non existant server grpc_channel_credentials* creds = grpc_insecure_credentials_create(); - grpc_channel* chan = grpc_channel_create("fake:nonexistant", creds, &args); + grpc_channel* chan = + grpc_channel_create("fake:nonexistant", creds, args.get()); grpc_channel_credentials_release(creds); gpr_timespec deadline = grpc_timeout_seconds_to_deadline(2); grpc_call* call = grpc_channel_create_call( @@ -80,9 +89,13 @@ void run_test(bool wait_for_ready) { grpc_core::CqVerifier::tag(1), nullptr)); { - response_generator->WaitForResolverSet(); + response_generator->WaitForResolverSet( + absl::Seconds(5 * grpc_test_slowdown_factor())); grpc_core::ExecCtx exec_ctx; - response_generator->SetFailure(); + grpc_core::Resolver::Result result; + result.addresses = absl::UnavailableError("Resolver transient failure"); + result.service_config = result.addresses.status(); + response_generator->SetResponseSynchronously(std::move(result)); } // verify that all tags get completed diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index 4bbbf8a2764..f5d8565f95d 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -467,22 +467,17 @@ TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) { // response generator to switch between the two. auto response_generator = grpc_core::MakeRefCounted(); - grpc_arg client_args[] = { - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 0), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0), - grpc_core::FakeResolverResponseGenerator::MakeChannelArg( - response_generator.get())}; - grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), - client_args}; + auto client_channel_args = + grpc_core::ChannelArgs() + .Set(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0) + .Set(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0) + .Set(GRPC_ARG_KEEPALIVE_TIME_MS, 1 * 1000) + .Set(GRPC_ARG_HTTP2_BDP_PROBE, 0) + .SetObject(response_generator) + .ToC(); grpc_channel_credentials* creds = grpc_insecure_credentials_create(); grpc_channel* channel = - grpc_channel_create("fake:///", creds, &client_channel_args); + grpc_channel_create("fake:///", creds, client_channel_args.get()); grpc_channel_credentials_release(creds); // For a single subchannel 3 GOAWAYs would be sufficient to increase the // keepalive time from 1 second to beyond 5 seconds. Even though we are @@ -539,22 +534,17 @@ TEST_F(KeepaliveThrottlingTest, // create a single channel with round robin load balancing policy. auto response_generator = grpc_core::MakeRefCounted(); - grpc_arg client_args[] = { - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 0), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000), - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0), - grpc_core::FakeResolverResponseGenerator::MakeChannelArg( - response_generator.get())}; - grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), - client_args}; + auto client_channel_args = + grpc_core::ChannelArgs() + .Set(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0) + .Set(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0) + .Set(GRPC_ARG_KEEPALIVE_TIME_MS, 1 * 1000) + .Set(GRPC_ARG_HTTP2_BDP_PROBE, 0) + .SetObject(response_generator) + .ToC(); grpc_channel_credentials* creds = grpc_insecure_credentials_create(); grpc_channel* channel = - grpc_channel_create("fake:///", creds, &client_channel_args); + grpc_channel_create("fake:///", creds, client_channel_args.get()); grpc_channel_credentials_release(creds); response_generator->SetResponseSynchronously( BuildResolverResult({absl::StrCat("ipv4:", server_address1), diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index ad4bbfb6490..4a2eb3f1d45 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -213,29 +213,16 @@ class FakeResolverResponseGeneratorWrapper { response_generator_ = std::move(other.response_generator_); } + void SetResponse(grpc_core::Resolver::Result result) { + grpc_core::ExecCtx exec_ctx; + response_generator_->SetResponseSynchronously(std::move(result)); + } + void SetNextResolution(const std::vector& ports, const char* service_config_json = nullptr, const grpc_core::ChannelArgs& per_address_args = grpc_core::ChannelArgs()) { - grpc_core::ExecCtx exec_ctx; - response_generator_->SetResponseSynchronously( - BuildFakeResults(ports, service_config_json, per_address_args)); - } - - void SetNextResolutionUponError(const std::vector& ports) { - grpc_core::ExecCtx exec_ctx; - response_generator_->SetReresolutionResponseSynchronously( - BuildFakeResults(ports)); - } - - void SetFailureOnReresolution() { - grpc_core::ExecCtx exec_ctx; - response_generator_->SetFailureOnReresolution(); - } - - void SetResponse(grpc_core::Resolver::Result result) { - grpc_core::ExecCtx exec_ctx; - response_generator_->SetResponseSynchronously(std::move(result)); + SetResponse(BuildFakeResults(ports, service_config_json, per_address_args)); } grpc_core::FakeResolverResponseGenerator* Get() const { @@ -1155,10 +1142,15 @@ TEST_F(PickFirstTest, ReresolutionNoSelected) { DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE, MakeConnectionFailureRegex("failed to connect to all addresses")); } - // Set a re-resolution result that contains reachable ports, so that the + // PF should request re-resolution. + gpr_log(GPR_INFO, "****** WAITING FOR RE-RESOLUTION *******"); + EXPECT_TRUE(response_generator.Get()->WaitForReresolutionRequest( + absl::Seconds(5 * grpc_test_slowdown_factor()))); + gpr_log(GPR_INFO, "****** RE-RESOLUTION SEEN *******"); + // Send a resolver result that contains reachable ports, so that the // pick_first LB policy can recover soon. - response_generator.SetNextResolutionUponError(alive_ports); - gpr_log(GPR_INFO, "****** RE-RESOLUTION SET *******"); + response_generator.SetNextResolution(alive_ports); + gpr_log(GPR_INFO, "****** RE-RESOLUTION SENT *******"); WaitForServer(DEBUG_LOCATION, stub, 0, [](const Status& status) { EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); EXPECT_THAT(status.error_message(), @@ -1294,7 +1286,6 @@ TEST_F(PickFirstTest, IdleOnDisconnect) { CheckRpcSendOk(DEBUG_LOCATION, stub); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); // Stop server. Channel should go into state IDLE. - response_generator.SetFailureOnReresolution(); servers_[0]->Shutdown(); EXPECT_TRUE(WaitForChannelNotReady(channel.get())); EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_IDLE); @@ -1603,16 +1594,20 @@ TEST_F(RoundRobinTest, ReresolveOnSubchannelConnectionFailure) { response_generator.SetNextResolution(ports); // Wait for both servers to be seen. WaitForServers(DEBUG_LOCATION, stub, 0, 2); - // Tell the fake resolver to send an update that adds the last server, but - // only when the LB policy requests re-resolution. - ports.push_back(servers_[2]->port_); - response_generator.SetNextResolutionUponError(ports); // Have server 0 send a GOAWAY. This should trigger a re-resolution. gpr_log(GPR_INFO, "****** SENDING GOAWAY FROM SERVER 0 *******"); { grpc_core::ExecCtx exec_ctx; grpc_core::Server::FromC(servers_[0]->server_->c_server())->SendGoaways(); } + gpr_log(GPR_INFO, "****** WAITING FOR RE-RESOLUTION REQUEST *******"); + EXPECT_TRUE(response_generator.Get()->WaitForReresolutionRequest( + absl::Seconds(5 * grpc_test_slowdown_factor()))); + gpr_log(GPR_INFO, "****** RE-RESOLUTION REQUEST SEEN *******"); + // Tell the fake resolver to send an update that adds the last server, but + // only when the LB policy requests re-resolution. + ports.push_back(servers_[2]->port_); + response_generator.SetNextResolution(ports); // Wait for the client to see server 2. WaitForServer(DEBUG_LOCATION, stub, 2); } diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 949ab9f5f1f..51ab18b45a1 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -25,9 +25,12 @@ #include #include +#include "absl/cleanup/cleanup.h" #include "absl/memory/memory.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" +#include "absl/synchronization/notification.h" +#include "absl/types/span.h" #include #include @@ -48,6 +51,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/config_vars.h" #include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/env.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -82,12 +86,12 @@ // 2) the retry timer is active. Again, the weak reference it holds should // prevent a premature call to \a glb_destroy. -using std::chrono::system_clock; - using grpc::lb::v1::LoadBalancer; using grpc::lb::v1::LoadBalanceRequest; using grpc::lb::v1::LoadBalanceResponse; +using grpc_core::SourceLocation; + namespace grpc { namespace testing { namespace { @@ -102,8 +106,10 @@ constexpr char kDefaultServiceConfig[] = using BackendService = CountedService; using BalancerService = CountedService; -const char g_kCallCredsMdKey[] = "call-creds"; -const char g_kCallCredsMdValue[] = "should not be received by balancer"; +const char kCallCredsMdKey[] = "call-creds"; +const char kCallCredsMdValue[] = "should not be received by balancer"; +const char kRequestMessage[] = "Live long and prosper."; +const absl::string_view kApplicationTargetName = "application_target_name"; // A test user agent string sent by the client only to the grpclb loadbalancer. // The backend should not see this user-agent string. @@ -123,10 +129,10 @@ class BackendServiceImpl : public BackendService { } // Backend should receive the call credentials metadata. auto call_credentials_entry = - context->client_metadata().find(g_kCallCredsMdKey); + context->client_metadata().find(kCallCredsMdKey); EXPECT_NE(call_credentials_entry, context->client_metadata().end()); if (call_credentials_entry != context->client_metadata().end()) { - EXPECT_EQ(call_credentials_entry->second, g_kCallCredsMdValue); + EXPECT_EQ(call_credentials_entry->second, kCallCredsMdValue); } IncreaseRequestCount(); const auto status = TestServiceImpl::Echo(context, request, response); @@ -200,125 +206,44 @@ class BalancerServiceImpl : public BalancerService { using Stream = ServerReaderWriter; using ResponseDelayPair = std::pair; - explicit BalancerServiceImpl(int client_load_reporting_interval_seconds) - : client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds) {} - - Status BalanceLoad(ServerContext* context, Stream* stream) override { - gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); + void Start() { { grpc::internal::MutexLock lock(&mu_); - if (serverlist_done_) goto done; + shutdown_stream_ = false; + response_queue_.clear(); } { - // The loadbalancer should see a test user agent because it was - // specifically configured at the client using - // GRPC_ARG_GRPCLB_CHANNEL_ARGS - auto it = context->client_metadata().find("user-agent"); - EXPECT_TRUE(it != context->client_metadata().end()); - if (it != context->client_metadata().end()) { - EXPECT_THAT(std::string(it->second.data(), it->second.length()), - ::testing::StartsWith(kGrpclbSpecificUserAgentString)); - } - // Balancer shouldn't receive the call credentials metadata. - EXPECT_EQ(context->client_metadata().find(g_kCallCredsMdKey), - context->client_metadata().end()); - LoadBalanceRequest request; - std::vector responses_and_delays; - - if (!stream->Read(&request)) { - goto done; - } else { - if (request.has_initial_request()) { - grpc::internal::MutexLock lock(&mu_); - service_names_.push_back(request.initial_request().name()); - } - } - IncreaseRequestCount(); - gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this, - request.DebugString().c_str()); - - // TODO(juanlishen): Initial response should always be the first response. - if (client_load_reporting_interval_seconds_ > 0) { - LoadBalanceResponse initial_response; - initial_response.mutable_initial_response() - ->mutable_client_stats_report_interval() - ->set_seconds(client_load_reporting_interval_seconds_); - stream->Write(initial_response); - } - - { - grpc::internal::MutexLock lock(&mu_); - responses_and_delays = responses_and_delays_; - } - for (const auto& response_and_delay : responses_and_delays) { - SendResponse(stream, response_and_delay.first, - response_and_delay.second); - } - { - grpc::internal::MutexLock lock(&mu_); - while (!serverlist_done_) { - serverlist_cond_.Wait(&mu_); - } - } - - if (client_load_reporting_interval_seconds_ > 0) { - request.Clear(); - while (stream->Read(&request)) { - gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'", - this, request.DebugString().c_str()); - GPR_ASSERT(request.has_client_stats()); - ClientStats load_report; - load_report.num_calls_started = - request.client_stats().num_calls_started(); - load_report.num_calls_finished = - request.client_stats().num_calls_finished(); - load_report.num_calls_finished_with_client_failed_to_send = - request.client_stats() - .num_calls_finished_with_client_failed_to_send(); - load_report.num_calls_finished_known_received = - request.client_stats().num_calls_finished_known_received(); - for (const auto& drop_token_count : - request.client_stats().calls_finished_with_drop()) { - load_report - .drop_token_counts[drop_token_count.load_balance_token()] = - drop_token_count.num_calls(); - } - // We need to acquire the lock here in order to prevent the notify_one - // below from firing before its corresponding wait is executed. - grpc::internal::MutexLock lock(&mu_); - load_report_queue_.emplace_back(std::move(load_report)); - load_report_cond_.Signal(); - } - } + grpc::internal::MutexLock lock(&load_report_mu_); + load_report_queue_.clear(); } - done: - gpr_log(GPR_INFO, "LB[%p]: done", this); - return Status::OK; } - void add_response(const LoadBalanceResponse& response, int send_after_ms) { - grpc::internal::MutexLock lock(&mu_); - responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); + void Shutdown() { + ShutdownStream(); + gpr_log(GPR_INFO, "LB[%p]: shut down", this); } - void Start() { + void set_client_load_reporting_interval_seconds(int seconds) { + client_load_reporting_interval_seconds_ = seconds; + } + + void SendResponse(LoadBalanceResponse response) { grpc::internal::MutexLock lock(&mu_); - serverlist_done_ = false; - responses_and_delays_.clear(); - load_report_queue_.clear(); + response_queue_.emplace_back(std::move(response)); + if (response_cond_ != nullptr) response_cond_->SignalAll(); } - void Shutdown() { - NotifyDoneWithServerlists(); - gpr_log(GPR_INFO, "LB[%p]: shut down", this); + void ShutdownStream() { + grpc::internal::MutexLock lock(&mu_); + shutdown_stream_ = true; + if (response_cond_ != nullptr) response_cond_->SignalAll(); } ClientStats WaitForLoadReport() { - grpc::internal::MutexLock lock(&mu_); + grpc::internal::MutexLock lock(&load_report_mu_); if (load_report_queue_.empty()) { while (load_report_queue_.empty()) { - load_report_cond_.Wait(&mu_); + load_report_cond_.Wait(&load_report_mu_); } } ClientStats load_report = std::move(load_report_queue_.front()); @@ -326,52 +251,200 @@ class BalancerServiceImpl : public BalancerService { return load_report; } - void NotifyDoneWithServerlists() { - grpc::internal::MutexLock lock(&mu_); - if (!serverlist_done_) { - serverlist_done_ = true; - serverlist_cond_.SignalAll(); - } - } - std::vector service_names() { grpc::internal::MutexLock lock(&mu_); return service_names_; } private: - void SendResponse(Stream* stream, const LoadBalanceResponse& response, - int delay_ms) { - gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms); - if (delay_ms > 0) { - gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(delay_ms)); + // Request handler. + Status BalanceLoad(ServerContext* context, Stream* stream) override { + gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); + { + grpc::internal::MutexLock lock(&mu_); + if (shutdown_stream_) { + gpr_log(GPR_INFO, "LB[%p]: stream shutdown at start", this); + return Status::OK; + } + } + // The loadbalancer should see a test user agent because it was + // specifically configured at the client using + // GRPC_ARG_GRPCLB_CHANNEL_ARGS + auto it = context->client_metadata().find("user-agent"); + EXPECT_TRUE(it != context->client_metadata().end()); + if (it != context->client_metadata().end()) { + EXPECT_THAT(std::string(it->second.data(), it->second.length()), + ::testing::StartsWith(kGrpclbSpecificUserAgentString)); + } + // Balancer shouldn't receive the call credentials metadata. + EXPECT_EQ(context->client_metadata().find(kCallCredsMdKey), + context->client_metadata().end()); + std::vector response_queue_and_delays; + // Read initial request. + LoadBalanceRequest request; + if (!stream->Read(&request)) { + gpr_log(GPR_INFO, "LB[%p]: stream read returned false", this); + return Status::OK; + } + EXPECT_TRUE(request.has_initial_request()); + { + grpc::internal::MutexLock lock(&mu_); + service_names_.push_back(request.initial_request().name()); + } + IncreaseRequestCount(); + gpr_log(GPR_INFO, "LB[%p]: received initial message '%s'", this, + request.DebugString().c_str()); + // Send initial response. + LoadBalanceResponse response; + auto* initial_response = response.mutable_initial_response(); + if (client_load_reporting_interval_seconds_ > 0) { + initial_response->mutable_client_stats_report_interval()->set_seconds( + client_load_reporting_interval_seconds_); } - gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this, - response.DebugString().c_str()); - IncreaseResponseCount(); stream->Write(response); + // Spawn a separate thread to read requests from the client. + absl::Notification reader_shutdown; + std::thread reader(std::bind(&BalancerServiceImpl::ReadThread, this, stream, + &reader_shutdown)); + auto thread_cleanup = absl::MakeCleanup([&]() { + gpr_log(GPR_INFO, "shutting down reader thread"); + reader_shutdown.Notify(); + gpr_log(GPR_INFO, "joining reader thread"); + reader.join(); + gpr_log(GPR_INFO, "joining reader thread complete"); + }); + // Send responses as instructed by the test. + while (true) { + auto response = GetNextResponse(); + if (!response.has_value()) break; + gpr_log(GPR_INFO, "LB[%p]: Sending response: %s", this, + response->DebugString().c_str()); + IncreaseResponseCount(); + stream->Write(*response); + } + gpr_log(GPR_INFO, "LB[%p]: done", this); + return Status::OK; + } + + // Reader thread spawned by request handler. + void ReadThread(Stream* stream, absl::Notification* shutdown) { + LoadBalanceRequest request; + while (!shutdown->HasBeenNotified() && stream->Read(&request)) { + gpr_log(GPR_INFO, "LB[%p]: received client load report message '%s'", + this, request.DebugString().c_str()); + EXPECT_GT(client_load_reporting_interval_seconds_, 0); + EXPECT_TRUE(request.has_client_stats()); + ClientStats load_report; + load_report.num_calls_started = + request.client_stats().num_calls_started(); + load_report.num_calls_finished = + request.client_stats().num_calls_finished(); + load_report.num_calls_finished_with_client_failed_to_send = + request.client_stats() + .num_calls_finished_with_client_failed_to_send(); + load_report.num_calls_finished_known_received = + request.client_stats().num_calls_finished_known_received(); + for (const auto& drop_token_count : + request.client_stats().calls_finished_with_drop()) { + load_report.drop_token_counts[drop_token_count.load_balance_token()] = + drop_token_count.num_calls(); + } + // We need to acquire the lock here in order to prevent the notify_one + // below from firing before its corresponding wait is executed. + grpc::internal::MutexLock lock(&load_report_mu_); + load_report_queue_.emplace_back(std::move(load_report)); + load_report_cond_.Signal(); + } + } + + // Helper for request handler thread to get the next response to be + // sent on the stream. Returns nullopt when the test has requested + // stream shutdown. + absl::optional GetNextResponse() { + grpc::internal::MutexLock lock(&mu_); + if (!shutdown_stream_ && response_queue_.empty()) { + grpc::internal::CondVar condition; + response_cond_ = &condition; + condition.Wait(&mu_); + response_cond_ = nullptr; + } + if (response_queue_.empty()) return absl::nullopt; + LoadBalanceResponse response = response_queue_.front(); + response_queue_.pop_front(); + return response; } - const int client_load_reporting_interval_seconds_; - std::vector responses_and_delays_; - std::vector service_names_; + int client_load_reporting_interval_seconds_ = 0; grpc::internal::Mutex mu_; - grpc::internal::CondVar serverlist_cond_; - bool serverlist_done_ ABSL_GUARDED_BY(mu_) = false; + std::vector service_names_ ABSL_GUARDED_BY(mu_); + bool shutdown_stream_ ABSL_GUARDED_BY(mu_) = false; + std::deque response_queue_ ABSL_GUARDED_BY(mu_); + grpc::internal::CondVar* response_cond_ ABSL_GUARDED_BY(mu_) = nullptr; + + grpc::internal::Mutex load_report_mu_; grpc::internal::CondVar load_report_cond_; - std::deque load_report_queue_ ABSL_GUARDED_BY(mu_); + std::deque load_report_queue_ ABSL_GUARDED_BY(load_report_mu_); }; class GrpclbEnd2endTest : public ::testing::Test { protected: - GrpclbEnd2endTest(size_t num_backends, size_t num_balancers, - int client_load_reporting_interval_seconds) - : server_host_("localhost"), - num_backends_(num_backends), - num_balancers_(num_balancers), - client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds) {} + template + struct ServerThread { + template + explicit ServerThread(const std::string& type, Args&&... args) + : port_(grpc_pick_unused_port_or_die()), + type_(type), + service_(std::forward(args)...) {} + + ~ServerThread() { Shutdown(); } + + void Start() { + gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_); + GPR_ASSERT(!running_); + running_ = true; + service_.Start(); + grpc::internal::Mutex mu; + // We need to acquire the lock here in order to prevent the notify_one + // by ServerThread::Serve from firing before the wait below is hit. + grpc::internal::MutexLock lock(&mu); + grpc::internal::CondVar cond; + thread_ = std::make_unique( + std::bind(&ServerThread::Serve, this, &mu, &cond)); + cond.Wait(&mu); + gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); + } + + void Serve(grpc::internal::Mutex* mu, grpc::internal::CondVar* cond) { + // We need to acquire the lock here in order to prevent the notify_one + // below from firing before its corresponding wait is executed. + grpc::internal::MutexLock lock(mu); + ServerBuilder builder; + std::shared_ptr creds(new SecureServerCredentials( + grpc_fake_transport_security_server_credentials_create())); + builder.AddListeningPort(grpc_core::LocalIpAndPort(port_), creds); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + cond->Signal(); + } + + void Shutdown() { + if (!running_) return; + gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str()); + service_.Shutdown(); + server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); + thread_->join(); + gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str()); + running_ = false; + } + + const int port_; + std::string type_; + T service_; + std::unique_ptr server_; + std::unique_ptr thread_; + bool running_ = false; + }; static void SetUpTestSuite() { // Make the backup poller poll very frequently in order to pick up @@ -391,30 +464,28 @@ class GrpclbEnd2endTest : public ::testing::Test { void SetUp() override { response_generator_ = grpc_core::MakeRefCounted(); - // Start the backends. - for (size_t i = 0; i < num_backends_; ++i) { - backends_.emplace_back(new ServerThread("backend")); - backends_.back()->Start(server_host_); - } - // Start the load balancers. - for (size_t i = 0; i < num_balancers_; ++i) { - balancers_.emplace_back(new ServerThread( - "balancer", client_load_reporting_interval_seconds_)); - balancers_.back()->Start(server_host_); - } + balancer_ = CreateAndStartBalancer(); ResetStub(); } void TearDown() override { ShutdownAllBackends(); - for (auto& balancer : balancers_) balancer->Shutdown(); + balancer_->Shutdown(); + } + + void CreateBackends(size_t num_backends) { + for (size_t i = 0; i < num_backends; ++i) { + backends_.emplace_back( + std::make_unique>("backend")); + backends_.back()->Start(); + } } void StartAllBackends() { - for (auto& backend : backends_) backend->Start(server_host_); + for (auto& backend : backends_) backend->Start(); } - void StartBackend(size_t index) { backends_[index]->Start(server_host_); } + void StartBackend(size_t index) { backends_[index]->Start(); } void ShutdownAllBackends() { for (auto& backend : backends_) backend->Shutdown(); @@ -422,7 +493,14 @@ class GrpclbEnd2endTest : public ::testing::Test { void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); } - void ResetStub(int fallback_timeout = 0, + std::unique_ptr> CreateAndStartBalancer() { + auto balancer = + std::make_unique>("balancer"); + balancer->Start(); + return balancer; + } + + void ResetStub(int fallback_timeout_ms = 0, const std::string& expected_targets = "", int subchannel_cache_delay_ms = 0) { // Send a separate user agent string for the grpclb load balancer alone. @@ -432,7 +510,10 @@ class GrpclbEnd2endTest : public ::testing::Test { grpclb_channel_args = grpclb_channel_args.Set( GRPC_ARG_PRIMARY_USER_AGENT_STRING, kGrpclbSpecificUserAgentString); ChannelArguments args; - if (fallback_timeout > 0) args.SetGrpclbFallbackTimeout(fallback_timeout); + if (fallback_timeout_ms > 0) { + args.SetGrpclbFallbackTimeout(fallback_timeout_ms * + grpc_test_slowdown_factor()); + } args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_.get()); if (!expected_targets.empty()) { @@ -464,20 +545,19 @@ class GrpclbEnd2endTest : public ::testing::Test { GRPC_ARG_EXPERIMENTAL_GRPCLB_CHANNEL_ARGS, const_cast(grpclb_channel_args.ToC().get()), &channel_args_vtable); - std::ostringstream uri; - uri << "fake:///" << kApplicationTargetName_; // TODO(dgq): templatize tests to run everything using both secure and // insecure channel credentials. grpc_channel_credentials* channel_creds = grpc_fake_transport_security_credentials_create(); grpc_call_credentials* call_creds = grpc_md_only_test_credentials_create( - g_kCallCredsMdKey, g_kCallCredsMdValue); + kCallCredsMdKey, kCallCredsMdValue); std::shared_ptr creds( new SecureChannelCredentials(grpc_composite_channel_credentials_create( channel_creds, call_creds, nullptr))); call_creds->Unref(); channel_creds->Unref(); - channel_ = grpc::CreateCustomChannel(uri.str(), creds, args); + channel_ = grpc::CreateCustomChannel( + absl::StrCat("fake:", kApplicationTargetName), creds, args); stub_ = grpc::testing::EchoTestService::NewStub(channel_); } @@ -486,11 +566,7 @@ class GrpclbEnd2endTest : public ::testing::Test { } ClientStats WaitForLoadReports() { - ClientStats client_stats; - for (auto& balancer : balancers_) { - client_stats += balancer->service_.WaitForLoadReport(); - } - return client_stats; + return balancer_->service_.WaitForLoadReport(); } bool SeenAllBackends(size_t start_index = 0, size_t stop_index = 0) { @@ -516,102 +592,106 @@ class GrpclbEnd2endTest : public ::testing::Test { ++*num_total; } - std::tuple WaitForAllBackends(int num_requests_multiple_of = 1, - size_t start_index = 0, - size_t stop_index = 0) { + struct WaitForBackendOptions { + int timeout_seconds = 10; + int num_requests_multiple_of = 1; + + WaitForBackendOptions() {} + WaitForBackendOptions& SetTimeoutSeconds(int seconds) { + timeout_seconds = seconds; + return *this; + } + WaitForBackendOptions& SetNumRequestsMultipleOf(int multiple) { + num_requests_multiple_of = multiple; + return *this; + } + }; + + std::tuple WaitForAllBackends( + size_t start_index = 0, size_t stop_index = 0, + WaitForBackendOptions options = WaitForBackendOptions(), + SourceLocation location = SourceLocation()) { + gpr_log(GPR_INFO, "Waiting for backends [%" PRIuPTR ", %" PRIuPTR ")", + start_index, stop_index); + const absl::Time deadline = + absl::Now() + + absl::Seconds(options.timeout_seconds * grpc_test_slowdown_factor()); int num_ok = 0; int num_failure = 0; int num_drops = 0; int num_total = 0; while (!SeenAllBackends(start_index, stop_index)) { + absl::Time now = absl::Now(); + EXPECT_LT(now, deadline) << location.file() << ":" << location.line(); + if (now > deadline) break; SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops); } - while (num_total % num_requests_multiple_of != 0) { + while (num_total % options.num_requests_multiple_of != 0) { + absl::Time now = absl::Now(); + EXPECT_LT(now, deadline) << location.file() << ":" << location.line(); + if (now > deadline) break; SendRpcAndCount(&num_total, &num_ok, &num_failure, &num_drops); } ResetBackendCounters(); gpr_log(GPR_INFO, "Performed %d warm up requests (a multiple of %d) against the " "backends. %d succeeded, %d failed, %d dropped.", - num_total, num_requests_multiple_of, num_ok, num_failure, + num_total, options.num_requests_multiple_of, num_ok, num_failure, num_drops); return std::make_tuple(num_ok, num_failure, num_drops); } - void WaitForBackend(size_t backend_idx) { - do { - (void)SendRpc(); - } while (backends_[backend_idx]->service_.request_count() == 0); - ResetBackendCounters(); + void WaitForBackend(size_t backend_idx, + WaitForBackendOptions options = WaitForBackendOptions(), + SourceLocation location = SourceLocation()) { + WaitForAllBackends(backend_idx, backend_idx + 1, options, location); } - struct AddressData { - int port; - std::string balancer_name; - }; - - grpc_core::EndpointAddressesList CreateLbAddressesFromAddressDataList( - const std::vector& address_data) { + grpc_core::EndpointAddressesList CreateAddressListFromPorts( + const absl::Span ports, absl::string_view balancer_name = "") { grpc_core::EndpointAddressesList addresses; - for (const auto& addr : address_data) { + for (int port : ports) { absl::StatusOr lb_uri = - grpc_core::URI::Parse(grpc_core::LocalIpUri(addr.port)); + grpc_core::URI::Parse(grpc_core::LocalIpUri(port)); GPR_ASSERT(lb_uri.ok()); grpc_resolved_address address; GPR_ASSERT(grpc_parse_uri(*lb_uri, &address)); - addresses.emplace_back( - address, grpc_core::ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, - addr.balancer_name)); + grpc_core::ChannelArgs args; + if (!balancer_name.empty()) { + args = args.Set(GRPC_ARG_DEFAULT_AUTHORITY, balancer_name); + } + addresses.emplace_back(address, args); } return addresses; } - grpc_core::Resolver::Result MakeResolverResult( - const std::vector& balancer_address_data, - const std::vector& backend_address_data = {}, + void SetNextResolutionFromEndpoints( + grpc_core::EndpointAddressesList balancers, + grpc_core::EndpointAddressesList backends = {}, const char* service_config_json = kDefaultServiceConfig) { + grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; - result.addresses = - CreateLbAddressesFromAddressDataList(backend_address_data); + result.addresses = std::move(backends); result.service_config = grpc_core::ServiceConfigImpl::Create( grpc_core::ChannelArgs(), service_config_json); GPR_ASSERT(result.service_config.ok()); - grpc_core::EndpointAddressesList balancer_addresses = - CreateLbAddressesFromAddressDataList(balancer_address_data); result.args = grpc_core::SetGrpcLbBalancerAddresses( - grpc_core::ChannelArgs(), std::move(balancer_addresses)); - return result; - } - - void SetNextResolutionAllBalancers( - const char* service_config_json = kDefaultServiceConfig) { - std::vector addresses; - for (size_t i = 0; i < balancers_.size(); ++i) { - addresses.emplace_back(AddressData{balancers_[i]->port_, ""}); - } - SetNextResolution(addresses, {}, service_config_json); + grpc_core::ChannelArgs(), std::move(balancers)); + response_generator_->SetResponseSynchronously(std::move(result)); } void SetNextResolution( - const std::vector& balancer_address_data, - const std::vector& backend_address_data = {}, + const absl::Span balancer_ports, + const absl::Span backend_ports = {}, const char* service_config_json = kDefaultServiceConfig) { - grpc_core::ExecCtx exec_ctx; - grpc_core::Resolver::Result result = MakeResolverResult( - balancer_address_data, backend_address_data, service_config_json); - response_generator_->SetResponseSynchronously(std::move(result)); + SetNextResolutionFromEndpoints(CreateAddressListFromPorts(balancer_ports), + CreateAddressListFromPorts(backend_ports), + service_config_json); } - void SetNextReresolutionResponse( - const std::vector& balancer_address_data, - const std::vector& backend_address_data = {}, + void SetNextResolutionDefaultBalancer( const char* service_config_json = kDefaultServiceConfig) { - grpc_core::ExecCtx exec_ctx; - response_generator_->WaitForResolverSet(); - grpc_core::Resolver::Result result = MakeResolverResult( - balancer_address_data, backend_address_data, service_config_json); - response_generator_->SetReresolutionResponseSynchronously( - std::move(result)); + SetNextResolution({balancer_->port_}, {}, service_config_json); } std::vector GetBackendPorts(size_t start_index = 0, @@ -624,10 +704,8 @@ class GrpclbEnd2endTest : public ::testing::Test { return backend_ports; } - void ScheduleResponseForBalancer(size_t i, - const LoadBalanceResponse& response, - int delay_ms) { - balancers_[i]->service_.add_response(response, delay_ms); + void SendBalancerResponse(LoadBalanceResponse response) { + balancer_->service_.SendResponse(std::move(response)); } LoadBalanceResponse BuildResponseForBackends( @@ -660,7 +738,7 @@ class GrpclbEnd2endTest : public ::testing::Test { const bool local_response = (response == nullptr); if (local_response) response = new EchoResponse; EchoRequest request; - request.set_message(kRequestMessage_); + request.set_message(kRequestMessage); if (!expected_status.ok()) { auto* error = request.mutable_param()->mutable_expected_error(); error->set_code(expected_status.error_code()); @@ -681,7 +759,7 @@ class GrpclbEnd2endTest : public ::testing::Test { const Status status = SendRpc(&response, timeout_ms, wait_for_ready); EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kRequestMessage_); + EXPECT_EQ(response.message(), kRequestMessage); } } @@ -690,159 +768,66 @@ class GrpclbEnd2endTest : public ::testing::Test { EXPECT_FALSE(status.ok()); } - template - struct ServerThread { - template - explicit ServerThread(const std::string& type, Args&&... args) - : port_(grpc_pick_unused_port_or_die()), - type_(type), - service_(std::forward(args)...) {} - - void Start(const std::string& server_host) { - gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_); - GPR_ASSERT(!running_); - running_ = true; - service_.Start(); - grpc::internal::Mutex mu; - // We need to acquire the lock here in order to prevent the notify_one - // by ServerThread::Serve from firing before the wait below is hit. - grpc::internal::MutexLock lock(&mu); - grpc::internal::CondVar cond; - thread_ = std::make_unique( - std::bind(&ServerThread::Serve, this, server_host, &mu, &cond)); - cond.Wait(&mu); - gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); - } - - void Serve(const std::string& server_host, grpc::internal::Mutex* mu, - grpc::internal::CondVar* cond) { - // We need to acquire the lock here in order to prevent the notify_one - // below from firing before its corresponding wait is executed. - grpc::internal::MutexLock lock(mu); - std::ostringstream server_address; - server_address << server_host << ":" << port_; - ServerBuilder builder; - std::shared_ptr creds(new SecureServerCredentials( - grpc_fake_transport_security_server_credentials_create())); - builder.AddListeningPort(server_address.str(), creds); - builder.RegisterService(&service_); - server_ = builder.BuildAndStart(); - cond->Signal(); - } - - void Shutdown() { - if (!running_) return; - gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str()); - service_.Shutdown(); - server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); - thread_->join(); - gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str()); - running_ = false; - } - - const int port_; - std::string type_; - T service_; - std::unique_ptr server_; - std::unique_ptr thread_; - bool running_ = false; - }; - - const std::string server_host_; - const size_t num_backends_; - const size_t num_balancers_; - const int client_load_reporting_interval_seconds_; std::shared_ptr channel_; std::unique_ptr stub_; std::vector>> backends_; - std::vector>> balancers_; + std::unique_ptr> balancer_; grpc_core::RefCountedPtr response_generator_; - const std::string kRequestMessage_ = "Live long and prosper."; - const std::string kApplicationTargetName_ = "application_target_name"; }; -class SingleBalancerTest : public GrpclbEnd2endTest { - public: - SingleBalancerTest() : GrpclbEnd2endTest(4, 1, 0) {} -}; - -TEST_F(SingleBalancerTest, Vanilla) { - SetNextResolutionAllBalancers(); +TEST_F(GrpclbEnd2endTest, Vanilla) { + const size_t kNumBackends = 3; const size_t kNumRpcsPerAddress = 100; - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); + CreateBackends(kNumBackends); + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); + SetNextResolutionDefaultBalancer(); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); // We need to wait for all backends to come online. WaitForAllBackends(); // Send kNumRpcsPerAddress RPCs per server. - CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); - + CheckRpcSendOk(kNumRpcsPerAddress * kNumBackends); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(kNumRpcsPerAddress, backends_[i]->service_.request_count()); } - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); - + EXPECT_EQ(1U, balancer_->service_.response_count()); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, SubchannelCaching) { - ResetStub(/*fallback_timeout=*/0, /*expected_targets=*/"", +TEST_F(GrpclbEnd2endTest, SubchannelCaching) { + CreateBackends(3); + ResetStub(/*fallback_timeout_ms=*/0, /*expected_targets=*/"", /*subchannel_cache_delay_ms=*/1500); - SetNextResolutionAllBalancers(); - // Initially send all backends. - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); - // Then remove backends 0 and 1. - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(2), {}), 1000); + SetNextResolutionDefaultBalancer(); + // Initially send backends 0 and 1. + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(0, 2), {})); + WaitForAllBackends(0, 2); + // Now remove backends 0 and 1 and add backend 2. + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(2), {})); + WaitForBackend(2); // Now re-add backend 1. - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(1), {}), 1000); - // Wait for all backends to come online. - WaitForAllBackends(); - // Send RPCs for long enough to get all responses. - gpr_timespec deadline = grpc_timeout_milliseconds_to_deadline(3000); - do { - CheckRpcSendOk(); - } while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); - // Backend 0 should have received less traffic than the others. - // Backend 1 would have received less traffic than 2 and 3. - gpr_log(GPR_INFO, "BACKEND 0: %" PRIuPTR " requests", - backends_[0]->service_.request_count()); - EXPECT_GT(backends_[0]->service_.request_count(), 0); - for (size_t i = 1; i < backends_.size(); ++i) { - gpr_log(GPR_INFO, "BACKEND %" PRIuPTR ": %" PRIuPTR " requests", i, - backends_[i]->service_.request_count()); - EXPECT_GT(backends_[i]->service_.request_count(), - backends_[0]->service_.request_count()) - << "backend " << i; - if (i >= 2) { - EXPECT_GT(backends_[i]->service_.request_count(), - backends_[1]->service_.request_count()) - << "backend " << i; - } - } + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(1), {})); + WaitForBackend(1); // Backend 1 should never have lost its connection from the client. EXPECT_EQ(1UL, backends_[1]->service_.clients().size()); - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // And sent 3 responses. - EXPECT_EQ(3U, balancers_[0]->service_.response_count()); + EXPECT_EQ(3U, balancer_->service_.response_count()); } -TEST_F(SingleBalancerTest, ReturnServerStatus) { - SetNextResolutionAllBalancers(); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); +TEST_F(GrpclbEnd2endTest, ReturnServerStatus) { + CreateBackends(1); + SetNextResolutionDefaultBalancer(); + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); // We need to wait for all backends to come online. WaitForAllBackends(); // Send a request that the backend will fail, and make sure we get @@ -854,29 +839,29 @@ TEST_F(SingleBalancerTest, ReturnServerStatus) { EXPECT_EQ(actual.error_message(), expected.error_message()); } -TEST_F(SingleBalancerTest, SelectGrpclbWithMigrationServiceConfig) { - SetNextResolutionAllBalancers( +TEST_F(GrpclbEnd2endTest, SelectGrpclbWithMigrationServiceConfig) { + CreateBackends(1); + SetNextResolutionDefaultBalancer( "{\n" " \"loadBalancingConfig\":[\n" " { \"does_not_exist\":{} },\n" " { \"grpclb\":{} }\n" " ]\n" "}"); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); CheckRpcSendOk(1, 3000 /* timeout_ms */, true /* wait_for_ready */); - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, +TEST_F(GrpclbEnd2endTest, SelectGrpclbWithMigrationServiceConfigAndNoAddresses) { - const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); + const int kFallbackTimeoutMs = 200; ResetStub(kFallbackTimeoutMs); SetNextResolution({}, {}, "{\n" @@ -898,8 +883,11 @@ TEST_F(SingleBalancerTest, EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, UsePickFirstChildPolicy) { - SetNextResolutionAllBalancers( +TEST_F(GrpclbEnd2endTest, UsePickFirstChildPolicy) { + const size_t kNumBackends = 2; + const size_t kNumRpcs = kNumBackends * 2; + CreateBackends(kNumBackends); + SetNextResolutionDefaultBalancer( "{\n" " \"loadBalancingConfig\":[\n" " { \"grpclb\":{\n" @@ -909,11 +897,9 @@ TEST_F(SingleBalancerTest, UsePickFirstChildPolicy) { " } }\n" " ]\n" "}"); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); - const size_t kNumRpcs = num_backends_ * 2; + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); CheckRpcSendOk(kNumRpcs, 3000 /* timeout_ms */, true /* wait_for_ready */); - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // Check that all requests went to the first backend. This verifies // that we used pick_first instead of round_robin as the child policy. EXPECT_EQ(backends_[0]->service_.request_count(), kNumRpcs); @@ -921,15 +907,18 @@ TEST_F(SingleBalancerTest, UsePickFirstChildPolicy) { EXPECT_EQ(backends_[i]->service_.request_count(), 0UL); } // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, SwapChildPolicy) { - SetNextResolutionAllBalancers( +TEST_F(GrpclbEnd2endTest, SwapChildPolicy) { + const size_t kNumBackends = 2; + const size_t kNumRpcs = kNumBackends * 2; + CreateBackends(kNumBackends); + SetNextResolutionDefaultBalancer( "{\n" " \"loadBalancingConfig\":[\n" " { \"grpclb\":{\n" @@ -939,9 +928,7 @@ TEST_F(SingleBalancerTest, SwapChildPolicy) { " } }\n" " ]\n" "}"); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); - const size_t kNumRpcs = num_backends_ * 2; + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); CheckRpcSendOk(kNumRpcs, 3000 /* timeout_ms */, true /* wait_for_ready */); // Check that all requests went to the first backend. This verifies // that we used pick_first instead of round_robin as the child policy. @@ -950,7 +937,7 @@ TEST_F(SingleBalancerTest, SwapChildPolicy) { EXPECT_EQ(backends_[i]->service_.request_count(), 0UL); } // Send new resolution that removes child policy from service config. - SetNextResolutionAllBalancers(); + SetNextResolutionDefaultBalancer(); WaitForAllBackends(); CheckRpcSendOk(kNumRpcs, 3000 /* timeout_ms */, true /* wait_for_ready */); // Check that every backend saw the same number of requests. This verifies @@ -959,23 +946,24 @@ TEST_F(SingleBalancerTest, SwapChildPolicy) { EXPECT_EQ(backends_[i]->service_.request_count(), 2UL); } // Done. - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) { - SetNextResolutionAllBalancers(); +TEST_F(GrpclbEnd2endTest, SameBackendListedMultipleTimes) { + CreateBackends(1); + SetNextResolutionDefaultBalancer(); // Same backend listed twice. std::vector ports; ports.push_back(backends_[0]->port_); ports.push_back(backends_[0]->port_); const size_t kNumRpcsPerAddress = 10; - ScheduleResponseForBalancer(0, BuildResponseForBackends(ports, {}), 0); + SendBalancerResponse(BuildResponseForBackends(ports, {})); // We need to wait for the backend to come online. WaitForBackend(0); // Send kNumRpcsPerAddress RPCs per server. @@ -985,281 +973,133 @@ TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) { // And they should have come from a single client port, because of // subchannel sharing. EXPECT_EQ(1UL, backends_[0]->service_.clients().size()); - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); } -TEST_F(SingleBalancerTest, SecureNaming) { - ResetStub(0, kApplicationTargetName_ + ";lb"); - SetNextResolution({AddressData{balancers_[0]->port_, "lb"}}); - const size_t kNumRpcsPerAddress = 100; - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); - // Make sure that trying to connect works without a call. - channel_->GetState(true /* try_to_connect */); +TEST_F(GrpclbEnd2endTest, SecureNaming) { + CreateBackends(1); + ResetStub(/*fallback_timeout_ms=*/0, + absl::StrCat(kApplicationTargetName, ";lb")); + SetNextResolutionFromEndpoints( + CreateAddressListFromPorts({balancer_->port_}, "lb")); + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); // We need to wait for all backends to come online. WaitForAllBackends(); - // Send kNumRpcsPerAddress RPCs per server. - CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); - - // Each backend should have gotten 100 requests. - for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(kNumRpcsPerAddress, backends_[i]->service_.request_count()); - } - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } -TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { - SetNextResolutionAllBalancers(); - const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); - const int kCallDeadlineMs = kServerlistDelayMs * 10; - // First response is an empty serverlist, sent right away. - ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0); - // Send non-empty serverlist only after kServerlistDelayMs - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs); - const auto t0 = system_clock::now(); - // Client will block: LB will initially send empty serverlist. - CheckRpcSendOk(1, kCallDeadlineMs, true /* wait_for_ready */); - const auto ellapsed_ms = - std::chrono::duration_cast( - system_clock::now() - t0); - // but eventually, the LB sends a serverlist update that allows the call to - // proceed. The call delay must be larger than the delay in sending the - // populated serverlist but under the call's deadline (which is enforced by - // the call's deadline). - EXPECT_GT(ellapsed_ms.count(), kServerlistDelayMs); - balancers_[0]->service_.NotifyDoneWithServerlists(); +TEST_F(GrpclbEnd2endTest, InitiallyEmptyServerlist) { + CreateBackends(1); + SetNextResolutionDefaultBalancer(); + // First response is an empty serverlist. RPCs should fail. + SendBalancerResponse(LoadBalanceResponse()); + CheckRpcSendFailure(); + // Now send a non-empty serverlist. + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); + CheckRpcSendOk(1, /*timeout_ms=*/3000, /*wait_for_ready=*/true); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent two responses. - EXPECT_EQ(2U, balancers_[0]->service_.response_count()); + EXPECT_EQ(2U, balancer_->service_.response_count()); } -TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) { - SetNextResolutionAllBalancers(); +TEST_F(GrpclbEnd2endTest, AllServersUnreachableFailFast) { + SetNextResolutionDefaultBalancer(); const size_t kNumUnreachableServers = 5; std::vector ports; for (size_t i = 0; i < kNumUnreachableServers; ++i) { ports.push_back(grpc_pick_unused_port_or_die()); } - ScheduleResponseForBalancer(0, BuildResponseForBackends(ports, {}), 0); + SendBalancerResponse(BuildResponseForBackends(ports, {})); const Status status = SendRpc(); // The error shouldn't be DEADLINE_EXCEEDED. EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); } -TEST_F(SingleBalancerTest, Fallback) { - SetNextResolutionAllBalancers(); - const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); - const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); - const size_t kNumBackendsInResolution = backends_.size() / 2; - - ResetStub(kFallbackTimeoutMs); - std::vector balancer_addresses; - balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - std::vector backend_addresses; - for (size_t i = 0; i < kNumBackendsInResolution; ++i) { - backend_addresses.emplace_back(AddressData{backends_[i]->port_, ""}); - } - SetNextResolution(balancer_addresses, backend_addresses); - - // Send non-empty serverlist only after kServerlistDelayMs. - ScheduleResponseForBalancer( - 0, - BuildResponseForBackends( - GetBackendPorts(kNumBackendsInResolution /* start_index */), {}), - kServerlistDelayMs); - +TEST_F(GrpclbEnd2endTest, Fallback) { + const size_t kNumBackends = 4; + const size_t kNumBackendsInResolution = kNumBackends / 2; + CreateBackends(kNumBackends); + // Inject resolver result that contains the fallback backends. + SetNextResolution({balancer_->port_}, + GetBackendPorts(0, kNumBackendsInResolution)); + // Balancer has not sent a serverlist, so we should use fallback. // Wait until all the fallback backends are reachable. - for (size_t i = 0; i < kNumBackendsInResolution; ++i) { - WaitForBackend(i); - } - - // The first request. - gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - CheckRpcSendOk(kNumBackendsInResolution); - gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - - // Fallback is used: each backend returned by the resolver should have - // gotten one request. - for (size_t i = 0; i < kNumBackendsInResolution; ++i) { - EXPECT_EQ(1U, backends_[i]->service_.request_count()); - } - for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) { - EXPECT_EQ(0U, backends_[i]->service_.request_count()); - } - - // Wait until the serverlist reception has been processed and all backends - // in the serverlist are reachable. - for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) { - WaitForBackend(i); - } - - // Send out the second request. - gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - CheckRpcSendOk(backends_.size() - kNumBackendsInResolution); - gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - - // Serverlist is used: each backend returned by the balancer should - // have gotten one request. - for (size_t i = 0; i < kNumBackendsInResolution; ++i) { - EXPECT_EQ(0U, backends_[i]->service_.request_count()); - } - for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) { - EXPECT_EQ(1U, backends_[i]->service_.request_count()); - } - - balancers_[0]->service_.NotifyDoneWithServerlists(); + WaitForAllBackends(0, kNumBackendsInResolution, + WaitForBackendOptions().SetTimeoutSeconds(20)); + // Send serverlist. + SendBalancerResponse(BuildResponseForBackends( + GetBackendPorts(/*start_index=*/kNumBackendsInResolution), {})); + // Now we should be using the backends from the balancer. + WaitForAllBackends(kNumBackendsInResolution); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); } -TEST_F(SingleBalancerTest, FallbackUpdate) { - SetNextResolutionAllBalancers(); - const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); - const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); - const size_t kNumBackendsInResolution = backends_.size() / 3; - const size_t kNumBackendsInResolutionUpdate = backends_.size() / 3; - - ResetStub(kFallbackTimeoutMs); - std::vector balancer_addresses; - balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - std::vector backend_addresses; - for (size_t i = 0; i < kNumBackendsInResolution; ++i) { - backend_addresses.emplace_back(AddressData{backends_[i]->port_, ""}); - } - SetNextResolution(balancer_addresses, backend_addresses); - - // Send non-empty serverlist only after kServerlistDelayMs. - ScheduleResponseForBalancer( - 0, - BuildResponseForBackends( - GetBackendPorts(kNumBackendsInResolution + - kNumBackendsInResolutionUpdate /* start_index */), - {}), - kServerlistDelayMs); - +TEST_F(GrpclbEnd2endTest, FallbackUpdate) { + const size_t kNumBackends = 6; + const size_t kNumBackendsInResolution = kNumBackends / 3; + const size_t kNumBackendsInResolutionUpdate = kNumBackends / 3; + ResetStub(/*fallback_timeout_ms=*/500); + CreateBackends(kNumBackends); + // Inject resolver result with fallback addresses. + SetNextResolution({balancer_->port_}, + GetBackendPorts(0, kNumBackendsInResolution)); + // Balancer has not sent a serverlist, so we should use fallback. // Wait until all the fallback backends are reachable. - for (size_t i = 0; i < kNumBackendsInResolution; ++i) { - WaitForBackend(i); - } - - // The first request. - gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); - CheckRpcSendOk(kNumBackendsInResolution); - gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); - - // Fallback is used: each backend returned by the resolver should have - // gotten one request. - for (size_t i = 0; i < kNumBackendsInResolution; ++i) { - EXPECT_EQ(1U, backends_[i]->service_.request_count()); - } - for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) { - EXPECT_EQ(0U, backends_[i]->service_.request_count()); - } - - balancer_addresses.clear(); - balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - backend_addresses.clear(); - for (size_t i = kNumBackendsInResolution; - i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) { - backend_addresses.emplace_back(AddressData{backends_[i]->port_, ""}); - } - SetNextResolution(balancer_addresses, backend_addresses); - - // Wait until the resolution update has been processed and all the new - // fallback backends are reachable. - for (size_t i = kNumBackendsInResolution; - i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) { - WaitForBackend(i); - } - - // Send out the second request. - gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); - CheckRpcSendOk(kNumBackendsInResolutionUpdate); - gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); - - // The resolution update is used: each backend in the resolution update should - // have gotten one request. - for (size_t i = 0; i < kNumBackendsInResolution; ++i) { - EXPECT_EQ(0U, backends_[i]->service_.request_count()); - } - for (size_t i = kNumBackendsInResolution; - i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) { - EXPECT_EQ(1U, backends_[i]->service_.request_count()); - } - for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate; - i < backends_.size(); ++i) { - EXPECT_EQ(0U, backends_[i]->service_.request_count()); - } - - // Wait until the serverlist reception has been processed and all backends - // in the serverlist are reachable. - for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate; - i < backends_.size(); ++i) { - WaitForBackend(i); - } - - // Send out the third request. - gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH =========="); - CheckRpcSendOk(backends_.size() - kNumBackendsInResolution - - kNumBackendsInResolutionUpdate); - gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH =========="); - - // Serverlist is used: each backend returned by the balancer should - // have gotten one request. - for (size_t i = 0; - i < kNumBackendsInResolution + kNumBackendsInResolutionUpdate; ++i) { - EXPECT_EQ(0U, backends_[i]->service_.request_count()); - } - for (size_t i = kNumBackendsInResolution + kNumBackendsInResolutionUpdate; - i < backends_.size(); ++i) { - EXPECT_EQ(1U, backends_[i]->service_.request_count()); - } - - balancers_[0]->service_.NotifyDoneWithServerlists(); + WaitForAllBackends(0, kNumBackendsInResolution); + // Now send a resolver result with a different set of backend addresses. + SetNextResolution({balancer_->port_}, + GetBackendPorts(kNumBackendsInResolution, + kNumBackendsInResolution + + kNumBackendsInResolutionUpdate)); + // Wait until the new fallback backends are reachable. + WaitForAllBackends(kNumBackendsInResolution, + kNumBackendsInResolution + kNumBackendsInResolutionUpdate); + // Send non-empty serverlist. + SendBalancerResponse( + BuildResponseForBackends(GetBackendPorts(kNumBackendsInResolution + + kNumBackendsInResolutionUpdate), + {})); + // Wait for backends from balancer to be seen. + WaitForAllBackends(kNumBackendsInResolution + kNumBackendsInResolutionUpdate); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); } -TEST_F(SingleBalancerTest, +TEST_F(GrpclbEnd2endTest, FallbackAfterStartupLoseContactWithBalancerThenBackends) { // First two backends are fallback, last two are pointed to by balancer. + const size_t kNumBackends = 4; const size_t kNumFallbackBackends = 2; - const size_t kNumBalancerBackends = backends_.size() - kNumFallbackBackends; - std::vector backend_addresses; - for (size_t i = 0; i < kNumFallbackBackends; ++i) { - backend_addresses.emplace_back(AddressData{backends_[i]->port_, ""}); - } - std::vector balancer_addresses; - for (size_t i = 0; i < balancers_.size(); ++i) { - balancer_addresses.emplace_back(AddressData{balancers_[i]->port_, ""}); - } - SetNextResolution(balancer_addresses, backend_addresses); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}), - 0); + const size_t kNumBalancerBackends = kNumBackends - kNumFallbackBackends; + CreateBackends(kNumBackends); + SetNextResolution({balancer_->port_}, + GetBackendPorts(0, kNumFallbackBackends)); + SendBalancerResponse( + BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {})); // Try to connect. - channel_->GetState(true /* try_to_connect */); - WaitForAllBackends(1 /* num_requests_multiple_of */, - kNumFallbackBackends /* start_index */); + WaitForAllBackends(kNumFallbackBackends /* start_index */); // Stop balancer. RPCs should continue going to backends from balancer. - balancers_[0]->Shutdown(); + balancer_->Shutdown(); CheckRpcSendOk(100 * kNumBalancerBackends); for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) { EXPECT_EQ(100UL, backends_[i]->service_.request_count()); @@ -1268,11 +1108,9 @@ TEST_F(SingleBalancerTest, for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) { ShutdownBackend(i); } - WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */, - kNumFallbackBackends /* stop_index */); + WaitForAllBackends(0, kNumFallbackBackends); // Restart the backends from the balancer. We should *not* start - // sending traffic back to them at this point (although the behavior - // in xds may be different). + // sending traffic back to them at this point. for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) { StartBackend(i); } @@ -1282,35 +1120,25 @@ TEST_F(SingleBalancerTest, } // Now start the balancer again. This should cause us to exit // fallback mode. - balancers_[0]->Start(server_host_); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}), - 0); - WaitForAllBackends(1 /* num_requests_multiple_of */, - kNumFallbackBackends /* start_index */); + balancer_->Start(); + SendBalancerResponse( + BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {})); + WaitForAllBackends(kNumFallbackBackends); } -TEST_F(SingleBalancerTest, +TEST_F(GrpclbEnd2endTest, FallbackAfterStartupLoseContactWithBackendsThenBalancer) { // First two backends are fallback, last two are pointed to by balancer. + const size_t kNumBackends = 4; const size_t kNumFallbackBackends = 2; - const size_t kNumBalancerBackends = backends_.size() - kNumFallbackBackends; - std::vector backend_addresses; - for (size_t i = 0; i < kNumFallbackBackends; ++i) { - backend_addresses.emplace_back(AddressData{backends_[i]->port_, ""}); - } - std::vector balancer_addresses; - for (size_t i = 0; i < balancers_.size(); ++i) { - balancer_addresses.emplace_back(AddressData{balancers_[i]->port_, ""}); - } - SetNextResolution(balancer_addresses, backend_addresses); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}), - 0); + const size_t kNumBalancerBackends = kNumBackends - kNumFallbackBackends; + CreateBackends(kNumBackends); + SetNextResolution({balancer_->port_}, + GetBackendPorts(0, kNumFallbackBackends)); + SendBalancerResponse( + BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {})); // Try to connect. - channel_->GetState(true /* try_to_connect */); - WaitForAllBackends(1 /* num_requests_multiple_of */, - kNumFallbackBackends /* start_index */); + WaitForAllBackends(kNumFallbackBackends); // Stop backends from balancer. Since we are still in contact with // the balancer at this point, RPCs should be failing. for (size_t i = kNumFallbackBackends; i < backends_.size(); ++i) { @@ -1318,9 +1146,8 @@ TEST_F(SingleBalancerTest, } CheckRpcSendFailure(); // Stop balancer. This should put us in fallback mode. - balancers_[0]->Shutdown(); - WaitForAllBackends(1 /* num_requests_multiple_of */, 0 /* start_index */, - kNumFallbackBackends /* stop_index */); + balancer_->Shutdown(); + WaitForAllBackends(0, kNumFallbackBackends); // Restart the backends from the balancer. We should *not* start // sending traffic back to them at this point (although the behavior // in xds may be different). @@ -1333,99 +1160,76 @@ TEST_F(SingleBalancerTest, } // Now start the balancer again. This should cause us to exit // fallback mode. - balancers_[0]->Start(server_host_); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {}), - 0); - WaitForAllBackends(1 /* num_requests_multiple_of */, - kNumFallbackBackends /* start_index */); + balancer_->Start(); + SendBalancerResponse( + BuildResponseForBackends(GetBackendPorts(kNumFallbackBackends), {})); + WaitForAllBackends(kNumFallbackBackends); } -TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) { - const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); +TEST_F(GrpclbEnd2endTest, FallbackEarlyWhenBalancerChannelFails) { + const int kFallbackTimeoutMs = 10000; ResetStub(kFallbackTimeoutMs); + CreateBackends(1); // Return an unreachable balancer and one fallback backend. - std::vector balancer_addresses; - balancer_addresses.emplace_back( - AddressData{grpc_pick_unused_port_or_die(), ""}); - std::vector backend_addresses; - backend_addresses.emplace_back(AddressData{backends_[0]->port_, ""}); - SetNextResolution(balancer_addresses, backend_addresses); + SetNextResolution({grpc_pick_unused_port_or_die()}, GetBackendPorts()); // Send RPC with deadline less than the fallback timeout and make sure it // succeeds. CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 3000, /* wait_for_ready */ false); } -TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) { - const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); +TEST_F(GrpclbEnd2endTest, FallbackEarlyWhenBalancerCallFails) { + const int kFallbackTimeoutMs = 10000; ResetStub(kFallbackTimeoutMs); + CreateBackends(1); // Return one balancer and one fallback backend. - std::vector balancer_addresses; - balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - std::vector backend_addresses; - backend_addresses.emplace_back(AddressData{backends_[0]->port_, ""}); - SetNextResolution(balancer_addresses, backend_addresses); + SetNextResolution({balancer_->port_}, GetBackendPorts()); // Balancer drops call without sending a serverlist. - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // Send RPC with deadline less than the fallback timeout and make sure it // succeeds. CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 3000, /* wait_for_ready */ false); } -TEST_F(SingleBalancerTest, FallbackControlledByBalancerBeforeFirstServerlist) { - const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); +TEST_F(GrpclbEnd2endTest, FallbackControlledByBalancerBeforeFirstServerlist) { + const int kFallbackTimeoutMs = 10000; ResetStub(kFallbackTimeoutMs); + CreateBackends(1); // Return one balancer and one fallback backend. - std::vector balancer_addresses; - balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - std::vector backend_addresses; - backend_addresses.emplace_back(AddressData{backends_[0]->port_, ""}); - SetNextResolution(balancer_addresses, backend_addresses); + SetNextResolution({balancer_->port_}, GetBackendPorts()); // Balancer explicitly tells client to fallback. - LoadBalanceResponse resp; - resp.mutable_fallback_response(); - ScheduleResponseForBalancer(0, resp, 0); + LoadBalanceResponse response; + response.mutable_fallback_response(); + SendBalancerResponse(std::move(response)); // Send RPC with deadline less than the fallback timeout and make sure it // succeeds. CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 3000, /* wait_for_ready */ false); } -TEST_F(SingleBalancerTest, FallbackControlledByBalancerAfterFirstServerlist) { +TEST_F(GrpclbEnd2endTest, FallbackControlledByBalancerAfterFirstServerlist) { + CreateBackends(2); // Return one balancer and one fallback backend (backend 0). - std::vector balancer_addresses; - balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - std::vector backend_addresses; - backend_addresses.emplace_back(AddressData{backends_[0]->port_, ""}); - SetNextResolution(balancer_addresses, backend_addresses); - // Balancer initially sends serverlist, then tells client to fall back, - // then sends the serverlist again. - // The serverlist points to backend 1. - LoadBalanceResponse serverlist_resp = - BuildResponseForBackends({backends_[1]->port_}, {}); - LoadBalanceResponse fallback_resp; - fallback_resp.mutable_fallback_response(); - ScheduleResponseForBalancer(0, serverlist_resp, 0); - ScheduleResponseForBalancer(0, fallback_resp, 100); - ScheduleResponseForBalancer(0, serverlist_resp, 100); - // Requests initially go to backend 1, then go to backend 0 in - // fallback mode, then go back to backend 1 when we exit fallback. + SetNextResolution({balancer_->port_}, {backends_[0]->port_}); + // Balancer sends a serverlist pointing to backend 1. + SendBalancerResponse(BuildResponseForBackends({backends_[1]->port_}, {})); WaitForBackend(1); + // Balancer tells client to fall back. + LoadBalanceResponse fallback_response; + fallback_response.mutable_fallback_response(); + SendBalancerResponse(std::move(fallback_response)); WaitForBackend(0); + // Balancer sends a new serverlist, so client exits fallback. + SendBalancerResponse(BuildResponseForBackends({backends_[1]->port_}, {})); WaitForBackend(1); } -TEST_F(SingleBalancerTest, BackendsRestart) { - SetNextResolutionAllBalancers(); - const size_t kNumRpcsPerAddress = 100; - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); - // Make sure that trying to connect works without a call. - channel_->GetState(true /* try_to_connect */); - // Send kNumRpcsPerAddress RPCs per server. - CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); +TEST_F(GrpclbEnd2endTest, BackendsRestart) { + CreateBackends(2); + SetNextResolutionDefaultBalancer(); + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); + WaitForAllBackends(); // Stop backends. RPCs should fail. ShutdownAllBackends(); CheckRpcSendFailure(); @@ -1434,12 +1238,12 @@ TEST_F(SingleBalancerTest, BackendsRestart) { CheckRpcSendOk(1 /* times */, 3000 /* timeout_ms */, true /* wait_for_ready */); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); } -TEST_F(SingleBalancerTest, ServiceNameFromLbPolicyConfig) { +TEST_F(GrpclbEnd2endTest, ServiceNameFromLbPolicyConfig) { constexpr char kServiceConfigWithTarget[] = "{\n" " \"loadBalancingConfig\":[\n" @@ -1448,20 +1252,16 @@ TEST_F(SingleBalancerTest, ServiceNameFromLbPolicyConfig) { " }}\n" " ]\n" "}"; - - SetNextResolutionAllBalancers(kServiceConfigWithTarget); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); - // Make sure that trying to connect works without a call. - channel_->GetState(true /* try_to_connect */); - // We need to wait for all backends to come online. + SetNextResolutionDefaultBalancer(kServiceConfigWithTarget); + CreateBackends(1); + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); WaitForAllBackends(); - EXPECT_EQ(balancers_[0]->service_.service_names().back(), "test_service"); + EXPECT_EQ(balancer_->service_.service_names().back(), "test_service"); } // This death test is kept separate from the rest to ensure that it's run before // any others. See https://github.com/grpc/grpc/pull/32269 for details. -using SingleBalancerDeathTest = SingleBalancerTest; +using SingleBalancerDeathTest = GrpclbEnd2endTest; TEST_F(SingleBalancerDeathTest, SecureNaming) { GTEST_FLAG_SET(death_test_style, "threadsafe"); @@ -1469,26 +1269,24 @@ TEST_F(SingleBalancerDeathTest, SecureNaming) { // the name from the balancer doesn't match expectations. ASSERT_DEATH_IF_SUPPORTED( { - ResetStub(0, kApplicationTargetName_ + ";lb"); - SetNextResolution({AddressData{balancers_[0]->port_, "woops"}}); + ResetStub(/*fallback_timeout_ms=*/0, + absl::StrCat(kApplicationTargetName, ";lb")); + SetNextResolutionFromEndpoints( + CreateAddressListFromPorts({balancer_->port_}, "woops")); channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1)); }, ""); } -class UpdatesTest : public GrpclbEnd2endTest { - public: - UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {} -}; - -TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) { - SetNextResolutionAllBalancers(); +TEST_F(GrpclbEnd2endTest, UpdateBalancersButKeepUsingOriginalBalancer) { + SetNextResolutionDefaultBalancer(); + CreateBackends(2); const std::vector first_backend{GetBackendPorts()[0]}; const std::vector second_backend{GetBackendPorts()[1]}; - ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}), - 0); - ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}), - 0); + SendBalancerResponse(BuildResponseForBackends(first_backend, {})); + auto balancer2 = CreateAndStartBalancer(); + balancer2->service_.SendResponse( + BuildResponseForBackends(second_backend, {})); // Wait until the first backend is ready. WaitForBackend(0); @@ -1501,19 +1299,14 @@ TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) { // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backends_[0]->service_.request_count()); - // Balancer 0 got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); - // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); - EXPECT_EQ(0U, balancers_[1]->service_.request_count()); - EXPECT_EQ(0U, balancers_[1]->service_.response_count()); - EXPECT_EQ(0U, balancers_[2]->service_.request_count()); - EXPECT_EQ(0U, balancers_[2]->service_.response_count()); - - std::vector addresses; - addresses.emplace_back(AddressData{balancers_[1]->port_, ""}); + // Balancer 0 got a single request and sent a single response. + EXPECT_EQ(1U, balancer_->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); + EXPECT_EQ(0U, balancer2->service_.request_count()); + EXPECT_EQ(0U, balancer2->service_.response_count()); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); - SetNextResolution(addresses); + SetNextResolution({balancer2->port_}); gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); EXPECT_EQ(0U, backends_[1]->service_.request_count()); @@ -1527,26 +1320,25 @@ TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) { // first balancer, which doesn't assign the second backend. EXPECT_EQ(0U, backends_[1]->service_.request_count()); - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); - EXPECT_EQ(0U, balancers_[1]->service_.request_count()); - EXPECT_EQ(0U, balancers_[1]->service_.response_count()); - EXPECT_EQ(0U, balancers_[2]->service_.request_count()); - EXPECT_EQ(0U, balancers_[2]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); + EXPECT_EQ(0U, balancer2->service_.request_count()); + EXPECT_EQ(0U, balancer2->service_.response_count()); } -// Send an update with the same set of LBs as the one in SetUp() in order to +// Send an update with the same set of LBs as the previous one in order to // verify that the LB channel inside grpclb keeps the initial connection (which // by definition is also present in the update). -TEST_F(UpdatesTest, UpdateBalancersRepeated) { - SetNextResolutionAllBalancers(); +TEST_F(GrpclbEnd2endTest, UpdateBalancersRepeated) { + CreateBackends(2); const std::vector first_backend{GetBackendPorts()[0]}; - const std::vector second_backend{GetBackendPorts()[0]}; + const std::vector second_backend{GetBackendPorts()[1]}; + SendBalancerResponse(BuildResponseForBackends(first_backend, {})); + auto balancer2 = CreateAndStartBalancer(); + balancer2->service_.SendResponse( + BuildResponseForBackends(second_backend, {})); - ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}), - 0); - ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}), - 0); + SetNextResolution({balancer_->port_, balancer2->port_}); // Wait until the first backend is ready. WaitForBackend(0); @@ -1559,22 +1351,15 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backends_[0]->service_.request_count()); - balancers_[0]->service_.NotifyDoneWithServerlists(); - // Balancer 0 got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); - // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); - EXPECT_EQ(0U, balancers_[1]->service_.request_count()); - EXPECT_EQ(0U, balancers_[1]->service_.response_count()); - EXPECT_EQ(0U, balancers_[2]->service_.request_count()); - EXPECT_EQ(0U, balancers_[2]->service_.response_count()); - - std::vector addresses; - addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - addresses.emplace_back(AddressData{balancers_[1]->port_, ""}); - addresses.emplace_back(AddressData{balancers_[2]->port_, ""}); + balancer_->service_.ShutdownStream(); + // Balancer 0 got a single request and sent a single response. + EXPECT_EQ(1U, balancer_->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); + EXPECT_EQ(0U, balancer2->service_.request_count()); + EXPECT_EQ(0U, balancer2->service_.response_count()); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); - SetNextResolution(addresses); + SetNextResolution({balancer_->port_, balancer2->port_}); gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); EXPECT_EQ(0U, backends_[1]->service_.request_count()); @@ -1587,39 +1372,19 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { // grpclb continued using the original LB call to the first balancer, which // doesn't assign the second backend. EXPECT_EQ(0U, backends_[1]->service_.request_count()); - balancers_[0]->service_.NotifyDoneWithServerlists(); - - addresses.clear(); - addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - addresses.emplace_back(AddressData{balancers_[1]->port_, ""}); - gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 2 =========="); - SetNextResolution(addresses); - gpr_log(GPR_INFO, "========= UPDATE 2 DONE =========="); - - EXPECT_EQ(0U, backends_[1]->service_.request_count()); - deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(10000, GPR_TIMESPAN)); - // Send 10 seconds worth of RPCs - do { - CheckRpcSendOk(); - } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); - // grpclb continued using the original LB call to the first balancer, which - // doesn't assign the second backend. - EXPECT_EQ(0U, backends_[1]->service_.request_count()); - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); } -TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { - std::vector addresses; - addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - SetNextResolution(addresses); +TEST_F(GrpclbEnd2endTest, UpdateBalancersDeadUpdate) { + SetNextResolutionDefaultBalancer(); + CreateBackends(2); const std::vector first_backend{GetBackendPorts()[0]}; const std::vector second_backend{GetBackendPorts()[1]}; - ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}), - 0); - ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}), - 0); + SendBalancerResponse(BuildResponseForBackends(first_backend, {})); + auto balancer2 = CreateAndStartBalancer(); + balancer2->service_.SendResponse( + BuildResponseForBackends(second_backend, {})); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -1630,7 +1395,7 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // Kill balancer 0 gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************"); - balancers_[0]->Shutdown(); + balancer_->Shutdown(); gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************"); // This is serviced by the existing RR policy @@ -1642,18 +1407,14 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { EXPECT_EQ(0U, backends_[1]->service_.request_count()); // Balancer 0 got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); - EXPECT_EQ(0U, balancers_[1]->service_.request_count()); - EXPECT_EQ(0U, balancers_[1]->service_.response_count()); - EXPECT_EQ(0U, balancers_[2]->service_.request_count()); - EXPECT_EQ(0U, balancers_[2]->service_.response_count()); - - addresses.clear(); - addresses.emplace_back(AddressData{balancers_[1]->port_, ""}); + EXPECT_EQ(1U, balancer_->service_.response_count()); + EXPECT_EQ(0U, balancer2->service_.request_count()); + EXPECT_EQ(0U, balancer2->service_.response_count()); + gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 =========="); - SetNextResolution(addresses); + SetNextResolution({balancer2->port_}); gpr_log(GPR_INFO, "========= UPDATE 1 DONE =========="); // Wait until update has been processed, as signaled by the second backend @@ -1670,98 +1431,63 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backends_[1]->service_.request_count()); - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); // The second balancer, published as part of the first update, may end up // getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer // firing races with the arrival of the update containing the second // balancer. - EXPECT_GE(balancers_[1]->service_.request_count(), 1U); - EXPECT_GE(balancers_[1]->service_.response_count(), 1U); - EXPECT_LE(balancers_[1]->service_.request_count(), 2U); - EXPECT_LE(balancers_[1]->service_.response_count(), 2U); - EXPECT_EQ(0U, balancers_[2]->service_.request_count()); - EXPECT_EQ(0U, balancers_[2]->service_.response_count()); + EXPECT_GE(balancer2->service_.request_count(), 1U); + EXPECT_GE(balancer2->service_.response_count(), 1U); + EXPECT_LE(balancer2->service_.request_count(), 2U); + EXPECT_LE(balancer2->service_.response_count(), 2U); } -TEST_F(UpdatesTest, ReresolveDeadBackend) { - ResetStub(500); +TEST_F(GrpclbEnd2endTest, ReresolveDeadBackend) { + ResetStub(/*fallback_timeout_ms=*/500); + CreateBackends(2); // The first resolution contains the addresses of a balancer that never // responds, and a fallback backend. - std::vector balancer_addresses; - balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - std::vector backend_addresses; - backend_addresses.emplace_back(AddressData{backends_[0]->port_, ""}); - SetNextResolution(balancer_addresses, backend_addresses); - // Ask channel to connect to trigger resolver creation. - channel_->GetState(true); - // The re-resolution result will contain the addresses of the same balancer - // and a new fallback backend. - balancer_addresses.clear(); - balancer_addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - backend_addresses.clear(); - backend_addresses.emplace_back(AddressData{backends_[1]->port_, ""}); - SetNextReresolutionResponse(balancer_addresses, backend_addresses); - + SetNextResolution({balancer_->port_}, {backends_[0]->port_}); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); // All 10 requests should have gone to the fallback backend. EXPECT_EQ(10U, backends_[0]->service_.request_count()); - // Kill backend 0. gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************"); backends_[0]->Shutdown(); gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************"); - - // Wait until re-resolution has finished, as signaled by the second backend + // This should trigger re-resolution. + EXPECT_TRUE(response_generator_->WaitForReresolutionRequest( + absl::Seconds(5 * grpc_test_slowdown_factor()))); + // The re-resolution result will contain the addresses of the same balancer + // and a new fallback backend. + SetNextResolution({balancer_->port_}, {backends_[1]->port_}); + // Wait until re-resolution has been seen, as signaled by the second backend // receiving a request. WaitForBackend(1); - gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); CheckRpcSendOk(10); gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backends_[1]->service_.request_count()); - - balancers_[0]->service_.NotifyDoneWithServerlists(); - balancers_[1]->service_.NotifyDoneWithServerlists(); - balancers_[2]->service_.NotifyDoneWithServerlists(); - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); - EXPECT_EQ(0U, balancers_[0]->service_.response_count()); - EXPECT_EQ(0U, balancers_[1]->service_.request_count()); - EXPECT_EQ(0U, balancers_[1]->service_.response_count()); - EXPECT_EQ(0U, balancers_[2]->service_.request_count()); - EXPECT_EQ(0U, balancers_[2]->service_.response_count()); + balancer_->service_.ShutdownStream(); + EXPECT_EQ(1U, balancer_->service_.request_count()); + EXPECT_EQ(0U, balancer_->service_.response_count()); } -// TODO(juanlishen): Should be removed when the first response is always the -// initial response. Currently, if client load reporting is not enabled, the -// balancer doesn't send initial response. When the backend shuts down, an -// unexpected re-resolution will happen. This test configuration is a workaround -// for test ReresolveDeadBalancer. -class UpdatesWithClientLoadReportingTest : public GrpclbEnd2endTest { - public: - UpdatesWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 3, 2) {} -}; - -TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) { +TEST_F(GrpclbEnd2endTest, ReresolveDeadBalancer) { + CreateBackends(2); const std::vector first_backend{GetBackendPorts()[0]}; const std::vector second_backend{GetBackendPorts()[1]}; - ScheduleResponseForBalancer(0, BuildResponseForBackends(first_backend, {}), - 0); - ScheduleResponseForBalancer(1, BuildResponseForBackends(second_backend, {}), - 0); - - // Ask channel to connect to trigger resolver creation. - channel_->GetState(true); - std::vector addresses; - addresses.emplace_back(AddressData{balancers_[0]->port_, ""}); - SetNextResolution(addresses); - addresses.clear(); - addresses.emplace_back(AddressData{balancers_[1]->port_, ""}); - SetNextReresolutionResponse(addresses); + SendBalancerResponse(BuildResponseForBackends(first_backend, {})); + auto balancer2 = CreateAndStartBalancer(); + balancer2->service_.SendResponse( + BuildResponseForBackends(second_backend, {})); + + SetNextResolutionDefaultBalancer(); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -1770,28 +1496,24 @@ TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) { // All 10 requests should have gone to the first backend. EXPECT_EQ(10U, backends_[0]->service_.request_count()); - // Kill backend 0. - gpr_log(GPR_INFO, "********** ABOUT TO KILL BACKEND 0 *************"); - backends_[0]->Shutdown(); - gpr_log(GPR_INFO, "********** KILLED BACKEND 0 *************"); - - CheckRpcSendFailure(); - - // Balancer 0 got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); - // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); - EXPECT_EQ(0U, balancers_[1]->service_.request_count()); - EXPECT_EQ(0U, balancers_[1]->service_.response_count()); - EXPECT_EQ(0U, balancers_[2]->service_.request_count()); - EXPECT_EQ(0U, balancers_[2]->service_.response_count()); + // Balancer 0 got a single request and sent a single request. + EXPECT_EQ(1U, balancer_->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); + EXPECT_EQ(0U, balancer2->service_.request_count()); + EXPECT_EQ(0U, balancer2->service_.response_count()); // Kill balancer 0. gpr_log(GPR_INFO, "********** ABOUT TO KILL BALANCER 0 *************"); - balancers_[0]->Shutdown(); + balancer_->Shutdown(); gpr_log(GPR_INFO, "********** KILLED BALANCER 0 *************"); - // Wait until re-resolution has finished, as signaled by the second backend + // This should trigger a re-resolution. + EXPECT_TRUE(response_generator_->WaitForReresolutionRequest( + absl::Seconds(5 * grpc_test_slowdown_factor()))); + gpr_log(GPR_INFO, "********** SAW RE-RESOLUTION REQUEST *************"); + // Re-resolution result switches to a new balancer. + SetNextResolution({balancer2->port_}); + // Wait until re-resolution has been seen, as signaled by the second backend // receiving a request. WaitForBackend(1); @@ -1802,42 +1524,31 @@ TEST_F(UpdatesWithClientLoadReportingTest, ReresolveDeadBalancer) { // All 10 requests should have gone to the second backend. EXPECT_EQ(10U, backends_[1]->service_.request_count()); - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); - // After balancer 0 is killed, we restart an LB call immediately (because we - // disconnect to a previously connected balancer). Although we will cancel - // this call when the re-resolution update is done and another LB call restart - // is needed, this old call may still succeed reaching the LB server if - // re-resolution is slow. So balancer 1 may have received 2 requests and sent - // 2 responses. - EXPECT_GE(balancers_[1]->service_.request_count(), 1U); - EXPECT_GE(balancers_[1]->service_.response_count(), 1U); - EXPECT_LE(balancers_[1]->service_.request_count(), 2U); - EXPECT_LE(balancers_[1]->service_.response_count(), 2U); - EXPECT_EQ(0U, balancers_[2]->service_.request_count()); - EXPECT_EQ(0U, balancers_[2]->service_.response_count()); + // First and second balancer should each have handled one request and + // sent one response. + EXPECT_EQ(1U, balancer_->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); + EXPECT_EQ(1U, balancer2->service_.request_count()); + EXPECT_EQ(1U, balancer2->service_.response_count()); } -TEST_F(SingleBalancerTest, Drop) { - SetNextResolutionAllBalancers(); +TEST_F(GrpclbEnd2endTest, Drop) { const size_t kNumRpcsPerAddress = 100; - const int num_of_drop_by_rate_limiting_addresses = 1; - const int num_of_drop_by_load_balancing_addresses = 2; - const int num_of_drop_addresses = num_of_drop_by_rate_limiting_addresses + - num_of_drop_by_load_balancing_addresses; - const int num_total_addresses = num_backends_ + num_of_drop_addresses; - ScheduleResponseForBalancer( - 0, - BuildResponseForBackends( - GetBackendPorts(), - {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, - {"load_balancing", num_of_drop_by_load_balancing_addresses}}), - 0); + const size_t kNumBackends = 2; + const int kNumDropRateLimiting = 1; + const int kNumDropLoadBalancing = 2; + const int kNumDropTotal = kNumDropRateLimiting + kNumDropLoadBalancing; + const int kNumAddressesTotal = kNumBackends + kNumDropTotal; + SetNextResolutionDefaultBalancer(); + CreateBackends(kNumBackends); + SendBalancerResponse(BuildResponseForBackends( + GetBackendPorts(), {{"rate_limiting", kNumDropRateLimiting}, + {"load_balancing", kNumDropLoadBalancing}})); // Wait until all backends are ready. WaitForAllBackends(); // Send kNumRpcsPerAddress RPCs for each server and drop address. size_t num_drops = 0; - for (size_t i = 0; i < kNumRpcsPerAddress * num_total_addresses; ++i) { + for (size_t i = 0; i < kNumRpcsPerAddress * kNumAddressesTotal; ++i) { EchoResponse response; const Status status = SendRpc(&response); if (!status.ok() && @@ -1846,52 +1557,41 @@ TEST_F(SingleBalancerTest, Drop) { } else { EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kRequestMessage_); + EXPECT_EQ(response.message(), kRequestMessage); } } - EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops); + EXPECT_EQ(kNumRpcsPerAddress * kNumDropTotal, num_drops); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(kNumRpcsPerAddress, backends_[i]->service_.request_count()); } // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); } -TEST_F(SingleBalancerTest, DropAllFirst) { - SetNextResolutionAllBalancers(); +TEST_F(GrpclbEnd2endTest, DropAllFirst) { + SetNextResolutionDefaultBalancer(); // All registered addresses are marked as "drop". - const int num_of_drop_by_rate_limiting_addresses = 1; - const int num_of_drop_by_load_balancing_addresses = 1; - ScheduleResponseForBalancer( - 0, - BuildResponseForBackends( - {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, - {"load_balancing", num_of_drop_by_load_balancing_addresses}}), - 0); + const int kNumDropRateLimiting = 1; + const int kNumDropLoadBalancing = 1; + SendBalancerResponse(BuildResponseForBackends( + {}, {{"rate_limiting", kNumDropRateLimiting}, + {"load_balancing", kNumDropLoadBalancing}})); const Status status = SendRpc(nullptr, 3000, true); EXPECT_FALSE(status.ok()); EXPECT_EQ(status.error_message(), "drop directed by grpclb balancer"); } -TEST_F(SingleBalancerTest, DropAll) { - SetNextResolutionAllBalancers(); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); - const int num_of_drop_by_rate_limiting_addresses = 1; - const int num_of_drop_by_load_balancing_addresses = 1; - ScheduleResponseForBalancer( - 0, - BuildResponseForBackends( - {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, - {"load_balancing", num_of_drop_by_load_balancing_addresses}}), - 1000); - - // First call succeeds. +TEST_F(GrpclbEnd2endTest, DropAll) { + CreateBackends(1); + SetNextResolutionDefaultBalancer(); + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); CheckRpcSendOk(); - // But eventually, the update with only dropped servers is processed and calls + SendBalancerResponse(BuildResponseForBackends( + {}, {{"rate_limiting", 1}, {"load_balancing", 1}})); + // Eventually, the update with only dropped servers is processed, and calls // fail. Status status; do { @@ -1901,66 +1601,61 @@ TEST_F(SingleBalancerTest, DropAll) { EXPECT_EQ(status.error_message(), "drop directed by grpclb balancer"); } -class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { - public: - SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 3) {} -}; - -TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { - SetNextResolutionAllBalancers(); +TEST_F(GrpclbEnd2endTest, ClientLoadReporting) { + const size_t kNumBackends = 3; + CreateBackends(kNumBackends); + balancer_->service_.set_client_load_reporting_interval_seconds(3); + SetNextResolutionDefaultBalancer(); const size_t kNumRpcsPerAddress = 100; - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); + SendBalancerResponse(BuildResponseForBackends(GetBackendPorts(), {})); // Wait until all backends are ready. int num_ok = 0; int num_failure = 0; int num_drops = 0; std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends(); // Send kNumRpcsPerAddress RPCs per server. - CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); + CheckRpcSendOk(kNumRpcsPerAddress * kNumBackends); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(kNumRpcsPerAddress, backends_[i]->service_.request_count()); } - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); - + EXPECT_EQ(1U, balancer_->service_.response_count()); ClientStats client_stats; do { client_stats += WaitForLoadReports(); } while (client_stats.num_calls_finished != - kNumRpcsPerAddress * num_backends_ + num_ok); - EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok, + kNumRpcsPerAddress * kNumBackends + num_ok); + EXPECT_EQ(kNumRpcsPerAddress * kNumBackends + num_ok, client_stats.num_calls_started); - EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_ok, + EXPECT_EQ(kNumRpcsPerAddress * kNumBackends + num_ok, client_stats.num_calls_finished); EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); - EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + (num_ok + num_drops), + EXPECT_EQ(kNumRpcsPerAddress * kNumBackends + (num_ok + num_drops), client_stats.num_calls_finished_known_received); EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre()); } -TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) { - SetNextResolutionAllBalancers(); +TEST_F(GrpclbEnd2endTest, LoadReportingWithBalancerRestart) { + const size_t kNumBackends = 4; const size_t kNumBackendsFirstPass = 2; - const size_t kNumBackendsSecondPass = - backends_.size() - kNumBackendsFirstPass; + const size_t kNumBackendsSecondPass = kNumBackends - kNumBackendsFirstPass; + CreateBackends(kNumBackends); + balancer_->service_.set_client_load_reporting_interval_seconds(3); + SetNextResolutionDefaultBalancer(); // Balancer returns backends starting at index 1. - ScheduleResponseForBalancer( - 0, - BuildResponseForBackends(GetBackendPorts(0, kNumBackendsFirstPass), {}), - 0); + SendBalancerResponse( + BuildResponseForBackends(GetBackendPorts(0, kNumBackendsFirstPass), {})); // Wait until all backends returned by the balancer are ready. int num_ok = 0; int num_failure = 0; int num_drops = 0; std::tie(num_ok, num_failure, num_drops) = - WaitForAllBackends(/* num_requests_multiple_of */ 1, /* start_index */ 0, - /* stop_index */ kNumBackendsFirstPass); - balancers_[0]->service_.NotifyDoneWithServerlists(); + WaitForAllBackends(0, kNumBackendsFirstPass); + balancer_->service_.ShutdownStream(); ClientStats client_stats = WaitForLoadReports(); EXPECT_EQ(static_cast(num_ok), client_stats.num_calls_started); EXPECT_EQ(static_cast(num_ok), client_stats.num_calls_finished); @@ -1969,7 +1664,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) { client_stats.num_calls_finished_known_received); EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre()); // Shut down the balancer. - balancers_[0]->Shutdown(); + balancer_->Shutdown(); // Send 10 more requests per backend. This will continue using the // last serverlist we received from the balancer before it was shut down. ResetBackendCounters(); @@ -1979,10 +1674,9 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) { EXPECT_EQ(1UL, backends_[i]->service_.request_count()); } // Now restart the balancer, this time pointing to all backends. - balancers_[0]->Start(server_host_); - ScheduleResponseForBalancer( - 0, BuildResponseForBackends(GetBackendPorts(kNumBackendsFirstPass), {}), - 0); + balancer_->Start(); + SendBalancerResponse( + BuildResponseForBackends(GetBackendPorts(kNumBackendsFirstPass), {})); // Wait for queries to start going to one of the new backends. // This tells us that we're now using the new serverlist. do { @@ -1991,7 +1685,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) { backends_[3]->service_.request_count() == 0); // Send one RPC per backend. CheckRpcSendOk(kNumBackendsSecondPass); - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // Check client stats. client_stats = WaitForLoadReports(); EXPECT_EQ(kNumBackendsSecondPass + 1, client_stats.num_calls_started); @@ -2002,31 +1696,31 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) { EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre()); } -TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { - SetNextResolutionAllBalancers(); +TEST_F(GrpclbEnd2endTest, LoadReportingWithDrops) { + const size_t kNumBackends = 3; const size_t kNumRpcsPerAddress = 3; - const int num_of_drop_by_rate_limiting_addresses = 2; - const int num_of_drop_by_load_balancing_addresses = 1; - const int num_of_drop_addresses = num_of_drop_by_rate_limiting_addresses + - num_of_drop_by_load_balancing_addresses; - const int num_total_addresses = num_backends_ + num_of_drop_addresses; - ScheduleResponseForBalancer( - 0, - BuildResponseForBackends( - GetBackendPorts(), - {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, - {"load_balancing", num_of_drop_by_load_balancing_addresses}}), - 0); + const int kNumDropRateLimiting = 2; + const int kNumDropLoadBalancing = 1; + const int kNumDropTotal = kNumDropRateLimiting + kNumDropLoadBalancing; + const int kNumAddressesTotal = kNumBackends + kNumDropTotal; + CreateBackends(kNumBackends); + balancer_->service_.set_client_load_reporting_interval_seconds(3); + SetNextResolutionDefaultBalancer(); + SendBalancerResponse(BuildResponseForBackends( + GetBackendPorts(), {{"rate_limiting", kNumDropRateLimiting}, + {"load_balancing", kNumDropLoadBalancing}})); // Wait until all backends are ready. int num_warmup_ok = 0; int num_warmup_failure = 0; int num_warmup_drops = 0; std::tie(num_warmup_ok, num_warmup_failure, num_warmup_drops) = - WaitForAllBackends(num_total_addresses /* num_requests_multiple_of */); + WaitForAllBackends( + 0, kNumBackends, + WaitForBackendOptions().SetNumRequestsMultipleOf(kNumAddressesTotal)); const int num_total_warmup_requests = num_warmup_ok + num_warmup_failure + num_warmup_drops; size_t num_drops = 0; - for (size_t i = 0; i < kNumRpcsPerAddress * num_total_addresses; ++i) { + for (size_t i = 0; i < kNumRpcsPerAddress * kNumAddressesTotal; ++i) { EchoResponse response; const Status status = SendRpc(&response); if (!status.ok() && @@ -2035,35 +1729,32 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { } else { EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kRequestMessage_); + EXPECT_EQ(response.message(), kRequestMessage); } } - EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops); + EXPECT_EQ(kNumRpcsPerAddress * kNumDropTotal, num_drops); // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { EXPECT_EQ(kNumRpcsPerAddress, backends_[i]->service_.request_count()); } - balancers_[0]->service_.NotifyDoneWithServerlists(); + balancer_->service_.ShutdownStream(); // The balancer got a single request. - EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + EXPECT_EQ(1U, balancer_->service_.request_count()); // and sent a single response. - EXPECT_EQ(1U, balancers_[0]->service_.response_count()); + EXPECT_EQ(1U, balancer_->service_.response_count()); const ClientStats client_stats = WaitForLoadReports(); - EXPECT_EQ( - kNumRpcsPerAddress * num_total_addresses + num_total_warmup_requests, - client_stats.num_calls_started); - EXPECT_EQ( - kNumRpcsPerAddress * num_total_addresses + num_total_warmup_requests, - client_stats.num_calls_finished); + EXPECT_EQ(kNumRpcsPerAddress * kNumAddressesTotal + num_total_warmup_requests, + client_stats.num_calls_started); + EXPECT_EQ(kNumRpcsPerAddress * kNumAddressesTotal + num_total_warmup_requests, + client_stats.num_calls_finished); EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); - EXPECT_EQ(kNumRpcsPerAddress * num_backends_ + num_warmup_ok, + EXPECT_EQ(kNumRpcsPerAddress * kNumBackends + num_warmup_ok, client_stats.num_calls_finished_known_received); // The number of warmup request is a multiple of the number of addresses. // Therefore, all addresses in the scheduled balancer response are hit the // same number of times. - const int num_times_drop_addresses_hit = - num_warmup_drops / num_of_drop_addresses; + const int num_times_drop_addresses_hit = num_warmup_drops / kNumDropTotal; EXPECT_THAT( client_stats.drop_token_counts, ::testing::ElementsAre(