diff --git a/test/cpp/end2end/xds/xds_override_host_end2end_test.cc b/test/cpp/end2end/xds/xds_override_host_end2end_test.cc index df036c15c3b..b6b4272ce85 100644 --- a/test/cpp/end2end/xds/xds_override_host_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_override_host_end2end_test.cc @@ -37,6 +37,7 @@ namespace grpc { namespace testing { namespace { +using ::envoy::config::core::v3::HealthStatus; using ::envoy::extensions::filters::http::stateful_session::v3::StatefulSession; using ::envoy::extensions::filters::network::http_connection_manager::v3:: HttpFilter; @@ -53,22 +54,6 @@ class OverrideHostTest : public XdsEnd2endTest { std::string raw; }; - std::multimap SendRpcGetServerMetadata( - const grpc_core::DebugLocation& debug_location, - const RpcOptions& rpc_options) { - std::multimap server_initial_metadata; - EchoResponse response; - grpc::Status status = - SendRpc(rpc_options, &response, &server_initial_metadata); - EXPECT_TRUE(status.ok()) - << "code=" << status.error_code() - << ", message=" << status.error_message() << "\n" - << debug_location.file() << ":" << debug_location.line(); - EXPECT_EQ(response.message(), kRequestMessage) - << debug_location.file() << ":" << debug_location.line(); - return server_initial_metadata; - } - static absl::optional ParseCookie(absl::string_view header, absl::string_view cookie_name) { std::pair name_value = @@ -86,7 +71,8 @@ class OverrideHostTest : public XdsEnd2endTest { std::string(name_value.second)}); } - static std::vector GetStatefulSessionCookie( + static std::vector> + GetHeadersWithSessionCookie( const std::multimap& server_initial_metadata, absl::string_view cookie_name = kCookieName) { std::vector values; @@ -100,32 +86,69 @@ class OverrideHostTest : public XdsEnd2endTest { EXPECT_THAT(cookie->attributes, ::testing::Contains("HttpOnly")); values.emplace_back(cookie->value); } - return values; + EXPECT_EQ(values.size(), 1); + if (values.size() == 1) { + return {{"cookie", absl::StrFormat("%s=%s", kCookieName, values[0])}}; + } else { + return {}; + } } // Builds a Listener with Fault Injection filter config. If the http_fault // is nullptr, then assign an empty filter config. This filter config is // required to enable the fault injection features. - static Listener BuildListenerWithStatefulSessionFilter() { + Listener BuildListenerWithStatefulSessionFilter() { CookieBasedSessionState cookie_state; cookie_state.mutable_cookie()->set_name(std::string(kCookieName)); StatefulSession stateful_session; stateful_session.mutable_session_state()->mutable_typed_config()->PackFrom( cookie_state); - HttpConnectionManager http_connection_manager; - Listener listener; - listener.set_name(kServerName); - HttpFilter* session_filter = http_connection_manager.add_http_filters(); + // HttpConnectionManager http_connection_manager; + Listener listener = default_listener_; + HttpConnectionManager http_connection_manager = + ClientHcmAccessor().Unpack(listener); + // Insert new filter ahead of the existing router filter. + HttpFilter* session_filter = + http_connection_manager.mutable_http_filters(0); + *http_connection_manager.add_http_filters() = *session_filter; session_filter->set_name("envoy.stateful_session"); session_filter->mutable_typed_config()->PackFrom(stateful_session); - HttpFilter* router_filter = http_connection_manager.add_http_filters(); - router_filter->set_name("router"); - router_filter->mutable_typed_config()->PackFrom( - envoy::extensions::filters::http::router::v3::Router()); - listener.mutable_api_listener()->mutable_api_listener()->PackFrom( - http_connection_manager); + ClientHcmAccessor().Pack(http_connection_manager, &listener); return listener; } + + std::vector> + GetAffinityCookieHeaderForBackend(grpc_core::DebugLocation debug_location, + size_t backend_index, + RpcOptions rpc_options = RpcOptions()) { + EXPECT_LT(backend_index, backends_.size()); + if (backend_index >= backends_.size()) { + return {}; + } + const auto& backend = backends_[backend_index]; + for (size_t i = 0; i < backends_.size(); ++i) { + std::multimap server_initial_metadata; + grpc::Status status = + SendRpc(rpc_options, nullptr, &server_initial_metadata); + EXPECT_TRUE(status.ok()) + << "code=" << status.error_code() + << ", message=" << status.error_message() << "\n" + << debug_location.file() << ":" << debug_location.line(); + if (!status.ok()) { + return {}; + } + size_t count = backend->backend_service()->request_count() + + backend->backend_service1()->request_count() + + backend->backend_service2()->request_count(); + ResetBackendCounters(); + if (count == 1) { + return GetHeadersWithSessionCookie(server_initial_metadata); + } + } + ADD_FAILURE_AT(debug_location.file(), debug_location.line()) + << "Desired backend had not been hit"; + return {}; + } }; INSTANTIATE_TEST_SUITE_P(XdsTest, OverrideHostTest, @@ -136,30 +159,17 @@ TEST_P(OverrideHostTest, HappyPath) { SetListenerAndRouteConfiguration(balancer_.get(), BuildListenerWithStatefulSessionFilter(), default_route_config_); - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); - balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + balancer_->ads_service()->SetEdsResource(BuildEdsResource( + EdsResourceArgs({{"locality0", + {CreateEndpoint(0, HealthStatus::HEALTHY), + CreateEndpoint(1, HealthStatus::UNKNOWN)}}}))); WaitForAllBackends(DEBUG_LOCATION); - // First call gets the cookie. RR policy picks the backend we will use. - auto server_initial_metadata = - SendRpcGetServerMetadata(DEBUG_LOCATION, RpcOptions()); - std::vector stateful_session_cookie = - GetStatefulSessionCookie(server_initial_metadata); - ASSERT_EQ(stateful_session_cookie.size(), 1); - size_t backend_idx = -1; - for (size_t i = 0; i < backends_.size(); ++i) { - if (backends_[i]->backend_service()->request_count() == 1) { - backend_idx = i; - break; - } - } - ASSERT_NE(-1, backend_idx); - ResetBackendCounters(); - std::vector> session_cookie = { - {"cookie", - absl::StrFormat("%s=%s", kCookieName, stateful_session_cookie[0])}}; + // Get cookie for backend #0. + auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 0); + ASSERT_FALSE(session_cookie.empty()); // All requests go to the backend we specified CheckRpcSendOk(DEBUG_LOCATION, 5, RpcOptions().set_metadata(session_cookie)); - EXPECT_EQ(backends_[backend_idx]->backend_service()->request_count(), 5); + EXPECT_EQ(backends_[0]->backend_service()->request_count(), 5); // Round-robin spreads the load ResetBackendCounters(); CheckRpcSendOk(DEBUG_LOCATION, backends_.size() * 2); @@ -171,7 +181,90 @@ TEST_P(OverrideHostTest, HappyPath) { RpcOptions() .set_metadata(session_cookie) .set_rpc_service(RpcService::SERVICE_ECHO2)); - EXPECT_EQ(backends_[backend_idx]->backend_service2()->request_count(), 5); + EXPECT_EQ(backends_[0]->backend_service2()->request_count(), 5); +} + +TEST_P(OverrideHostTest, DrainingIncludedFromOverrideSet) { + CreateAndStartBackends(3); + Cluster cluster = default_cluster_; + auto* lb_config = cluster.mutable_common_lb_config(); + auto* override_health_status_set = lb_config->mutable_override_host_status(); + override_health_status_set->add_statuses(HealthStatus::HEALTHY); + override_health_status_set->add_statuses(HealthStatus::UNKNOWN); + override_health_status_set->add_statuses(HealthStatus::DRAINING); + balancer_->ads_service()->SetCdsResource(cluster); + SetListenerAndRouteConfiguration(balancer_.get(), + BuildListenerWithStatefulSessionFilter(), + default_route_config_); + balancer_->ads_service()->SetEdsResource(BuildEdsResource( + EdsResourceArgs({{"locality0", + {CreateEndpoint(0, HealthStatus::HEALTHY), + CreateEndpoint(1, HealthStatus::HEALTHY)}}}))); + WaitForAllBackends(DEBUG_LOCATION, 0, 2); + CheckRpcSendOk(DEBUG_LOCATION, 4); + EXPECT_EQ(2, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(2, backends_[1]->backend_service()->request_count()); + EXPECT_EQ(0, backends_[2]->backend_service()->request_count()); + ResetBackendCounters(); + // Get cookie for backend #0. + auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 0); + ASSERT_FALSE(session_cookie.empty()); + balancer_->ads_service()->SetEdsResource(BuildEdsResource( + EdsResourceArgs({{"locality0", + {CreateEndpoint(0, HealthStatus::DRAINING), + CreateEndpoint(1, HealthStatus::HEALTHY), + CreateEndpoint(2, HealthStatus::HEALTHY)}}}))); + WaitForAllBackends(DEBUG_LOCATION, 2); + // Draining subchannel works when used as an override host. + CheckRpcSendOk(DEBUG_LOCATION, 4, RpcOptions().set_metadata(session_cookie)); + EXPECT_EQ(4, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(0, backends_[1]->backend_service()->request_count()); + EXPECT_EQ(0, backends_[2]->backend_service()->request_count()); + ResetBackendCounters(); + // Round robin does not see the draining backend + CheckRpcSendOk(DEBUG_LOCATION, 4); + EXPECT_EQ(0, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(2, backends_[1]->backend_service()->request_count()); + EXPECT_EQ(2, backends_[2]->backend_service()->request_count()); + ResetBackendCounters(); +} + +TEST_P(OverrideHostTest, DrainingExcludedFromOverrideSet) { + CreateAndStartBackends(3); + Cluster cluster = default_cluster_; + auto* lb_config = cluster.mutable_common_lb_config(); + auto* override_health_status_set = lb_config->mutable_override_host_status(); + override_health_status_set->add_statuses(HealthStatus::HEALTHY); + override_health_status_set->add_statuses(HealthStatus::UNKNOWN); + balancer_->ads_service()->SetCdsResource(cluster); + SetListenerAndRouteConfiguration(balancer_.get(), + BuildListenerWithStatefulSessionFilter(), + default_route_config_); + balancer_->ads_service()->SetEdsResource(BuildEdsResource( + EdsResourceArgs({{"locality0", + {CreateEndpoint(0, HealthStatus::HEALTHY), + CreateEndpoint(1, HealthStatus::HEALTHY)}}}))); + WaitForAllBackends(DEBUG_LOCATION, 0, 2); + CheckRpcSendOk(DEBUG_LOCATION, 4); + EXPECT_EQ(2, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(2, backends_[1]->backend_service()->request_count()); + EXPECT_EQ(0, backends_[2]->backend_service()->request_count()); + ResetBackendCounters(); + // Get a cookie for backends_[0]. + auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 0); + ASSERT_FALSE(session_cookie.empty()); + balancer_->ads_service()->SetEdsResource(BuildEdsResource( + EdsResourceArgs({{"locality0", + {CreateEndpoint(0, HealthStatus::DRAINING), + CreateEndpoint(1, HealthStatus::HEALTHY), + CreateEndpoint(2, HealthStatus::UNKNOWN)}}}))); + WaitForAllBackends(DEBUG_LOCATION, 2); + // Override for the draining host is not honored, RR is used instead. + CheckRpcSendOk(DEBUG_LOCATION, 4, RpcOptions().set_metadata(session_cookie)); + EXPECT_EQ(0, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(2, backends_[1]->backend_service()->request_count()); + EXPECT_EQ(2, backends_[2]->backend_service()->request_count()); + ResetBackendCounters(); } } // namespace } // namespace testing