stateful session affinity: add e2e test for draining state (#32248)

pull/32278/head
Eugene Ostroukhov 2 years ago committed by GitHub
parent 190d095a62
commit 72872fc29d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 195
      test/cpp/end2end/xds/xds_override_host_end2end_test.cc

@ -37,6 +37,7 @@
namespace grpc {
namespace testing {
namespace {
using ::envoy::config::core::v3::HealthStatus;
using ::envoy::extensions::filters::http::stateful_session::v3::StatefulSession;
using ::envoy::extensions::filters::network::http_connection_manager::v3::
HttpFilter;
@ -53,22 +54,6 @@ class OverrideHostTest : public XdsEnd2endTest {
std::string raw;
};
std::multimap<std::string, std::string> SendRpcGetServerMetadata(
const grpc_core::DebugLocation& debug_location,
const RpcOptions& rpc_options) {
std::multimap<std::string, std::string> server_initial_metadata;
EchoResponse response;
grpc::Status status =
SendRpc(rpc_options, &response, &server_initial_metadata);
EXPECT_TRUE(status.ok())
<< "code=" << status.error_code()
<< ", message=" << status.error_message() << "\n"
<< debug_location.file() << ":" << debug_location.line();
EXPECT_EQ(response.message(), kRequestMessage)
<< debug_location.file() << ":" << debug_location.line();
return server_initial_metadata;
}
static absl::optional<Cookie> ParseCookie(absl::string_view header,
absl::string_view cookie_name) {
std::pair<absl::string_view, absl::string_view> name_value =
@ -86,7 +71,8 @@ class OverrideHostTest : public XdsEnd2endTest {
std::string(name_value.second)});
}
static std::vector<std::string> GetStatefulSessionCookie(
static std::vector<std::pair<std::string, std::string>>
GetHeadersWithSessionCookie(
const std::multimap<std::string, std::string>& server_initial_metadata,
absl::string_view cookie_name = kCookieName) {
std::vector<std::string> values;
@ -100,32 +86,69 @@ class OverrideHostTest : public XdsEnd2endTest {
EXPECT_THAT(cookie->attributes, ::testing::Contains("HttpOnly"));
values.emplace_back(cookie->value);
}
return values;
EXPECT_EQ(values.size(), 1);
if (values.size() == 1) {
return {{"cookie", absl::StrFormat("%s=%s", kCookieName, values[0])}};
} else {
return {};
}
}
// Builds a Listener with Fault Injection filter config. If the http_fault
// is nullptr, then assign an empty filter config. This filter config is
// required to enable the fault injection features.
static Listener BuildListenerWithStatefulSessionFilter() {
Listener BuildListenerWithStatefulSessionFilter() {
CookieBasedSessionState cookie_state;
cookie_state.mutable_cookie()->set_name(std::string(kCookieName));
StatefulSession stateful_session;
stateful_session.mutable_session_state()->mutable_typed_config()->PackFrom(
cookie_state);
HttpConnectionManager http_connection_manager;
Listener listener;
listener.set_name(kServerName);
HttpFilter* session_filter = http_connection_manager.add_http_filters();
// HttpConnectionManager http_connection_manager;
Listener listener = default_listener_;
HttpConnectionManager http_connection_manager =
ClientHcmAccessor().Unpack(listener);
// Insert new filter ahead of the existing router filter.
HttpFilter* session_filter =
http_connection_manager.mutable_http_filters(0);
*http_connection_manager.add_http_filters() = *session_filter;
session_filter->set_name("envoy.stateful_session");
session_filter->mutable_typed_config()->PackFrom(stateful_session);
HttpFilter* router_filter = http_connection_manager.add_http_filters();
router_filter->set_name("router");
router_filter->mutable_typed_config()->PackFrom(
envoy::extensions::filters::http::router::v3::Router());
listener.mutable_api_listener()->mutable_api_listener()->PackFrom(
http_connection_manager);
ClientHcmAccessor().Pack(http_connection_manager, &listener);
return listener;
}
std::vector<std::pair<std::string, std::string>>
GetAffinityCookieHeaderForBackend(grpc_core::DebugLocation debug_location,
size_t backend_index,
RpcOptions rpc_options = RpcOptions()) {
EXPECT_LT(backend_index, backends_.size());
if (backend_index >= backends_.size()) {
return {};
}
const auto& backend = backends_[backend_index];
for (size_t i = 0; i < backends_.size(); ++i) {
std::multimap<std::string, std::string> server_initial_metadata;
grpc::Status status =
SendRpc(rpc_options, nullptr, &server_initial_metadata);
EXPECT_TRUE(status.ok())
<< "code=" << status.error_code()
<< ", message=" << status.error_message() << "\n"
<< debug_location.file() << ":" << debug_location.line();
if (!status.ok()) {
return {};
}
size_t count = backend->backend_service()->request_count() +
backend->backend_service1()->request_count() +
backend->backend_service2()->request_count();
ResetBackendCounters();
if (count == 1) {
return GetHeadersWithSessionCookie(server_initial_metadata);
}
}
ADD_FAILURE_AT(debug_location.file(), debug_location.line())
<< "Desired backend had not been hit";
return {};
}
};
INSTANTIATE_TEST_SUITE_P(XdsTest, OverrideHostTest,
@ -136,30 +159,17 @@ TEST_P(OverrideHostTest, HappyPath) {
SetListenerAndRouteConfiguration(balancer_.get(),
BuildListenerWithStatefulSessionFilter(),
default_route_config_);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
balancer_->ads_service()->SetEdsResource(BuildEdsResource(
EdsResourceArgs({{"locality0",
{CreateEndpoint(0, HealthStatus::HEALTHY),
CreateEndpoint(1, HealthStatus::UNKNOWN)}}})));
WaitForAllBackends(DEBUG_LOCATION);
// First call gets the cookie. RR policy picks the backend we will use.
auto server_initial_metadata =
SendRpcGetServerMetadata(DEBUG_LOCATION, RpcOptions());
std::vector<std::string> stateful_session_cookie =
GetStatefulSessionCookie(server_initial_metadata);
ASSERT_EQ(stateful_session_cookie.size(), 1);
size_t backend_idx = -1;
for (size_t i = 0; i < backends_.size(); ++i) {
if (backends_[i]->backend_service()->request_count() == 1) {
backend_idx = i;
break;
}
}
ASSERT_NE(-1, backend_idx);
ResetBackendCounters();
std::vector<std::pair<std::string, std::string>> session_cookie = {
{"cookie",
absl::StrFormat("%s=%s", kCookieName, stateful_session_cookie[0])}};
// Get cookie for backend #0.
auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 0);
ASSERT_FALSE(session_cookie.empty());
// All requests go to the backend we specified
CheckRpcSendOk(DEBUG_LOCATION, 5, RpcOptions().set_metadata(session_cookie));
EXPECT_EQ(backends_[backend_idx]->backend_service()->request_count(), 5);
EXPECT_EQ(backends_[0]->backend_service()->request_count(), 5);
// Round-robin spreads the load
ResetBackendCounters();
CheckRpcSendOk(DEBUG_LOCATION, backends_.size() * 2);
@ -171,7 +181,90 @@ TEST_P(OverrideHostTest, HappyPath) {
RpcOptions()
.set_metadata(session_cookie)
.set_rpc_service(RpcService::SERVICE_ECHO2));
EXPECT_EQ(backends_[backend_idx]->backend_service2()->request_count(), 5);
EXPECT_EQ(backends_[0]->backend_service2()->request_count(), 5);
}
TEST_P(OverrideHostTest, DrainingIncludedFromOverrideSet) {
CreateAndStartBackends(3);
Cluster cluster = default_cluster_;
auto* lb_config = cluster.mutable_common_lb_config();
auto* override_health_status_set = lb_config->mutable_override_host_status();
override_health_status_set->add_statuses(HealthStatus::HEALTHY);
override_health_status_set->add_statuses(HealthStatus::UNKNOWN);
override_health_status_set->add_statuses(HealthStatus::DRAINING);
balancer_->ads_service()->SetCdsResource(cluster);
SetListenerAndRouteConfiguration(balancer_.get(),
BuildListenerWithStatefulSessionFilter(),
default_route_config_);
balancer_->ads_service()->SetEdsResource(BuildEdsResource(
EdsResourceArgs({{"locality0",
{CreateEndpoint(0, HealthStatus::HEALTHY),
CreateEndpoint(1, HealthStatus::HEALTHY)}}})));
WaitForAllBackends(DEBUG_LOCATION, 0, 2);
CheckRpcSendOk(DEBUG_LOCATION, 4);
EXPECT_EQ(2, backends_[0]->backend_service()->request_count());
EXPECT_EQ(2, backends_[1]->backend_service()->request_count());
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
ResetBackendCounters();
// Get cookie for backend #0.
auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 0);
ASSERT_FALSE(session_cookie.empty());
balancer_->ads_service()->SetEdsResource(BuildEdsResource(
EdsResourceArgs({{"locality0",
{CreateEndpoint(0, HealthStatus::DRAINING),
CreateEndpoint(1, HealthStatus::HEALTHY),
CreateEndpoint(2, HealthStatus::HEALTHY)}}})));
WaitForAllBackends(DEBUG_LOCATION, 2);
// Draining subchannel works when used as an override host.
CheckRpcSendOk(DEBUG_LOCATION, 4, RpcOptions().set_metadata(session_cookie));
EXPECT_EQ(4, backends_[0]->backend_service()->request_count());
EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
ResetBackendCounters();
// Round robin does not see the draining backend
CheckRpcSendOk(DEBUG_LOCATION, 4);
EXPECT_EQ(0, backends_[0]->backend_service()->request_count());
EXPECT_EQ(2, backends_[1]->backend_service()->request_count());
EXPECT_EQ(2, backends_[2]->backend_service()->request_count());
ResetBackendCounters();
}
TEST_P(OverrideHostTest, DrainingExcludedFromOverrideSet) {
CreateAndStartBackends(3);
Cluster cluster = default_cluster_;
auto* lb_config = cluster.mutable_common_lb_config();
auto* override_health_status_set = lb_config->mutable_override_host_status();
override_health_status_set->add_statuses(HealthStatus::HEALTHY);
override_health_status_set->add_statuses(HealthStatus::UNKNOWN);
balancer_->ads_service()->SetCdsResource(cluster);
SetListenerAndRouteConfiguration(balancer_.get(),
BuildListenerWithStatefulSessionFilter(),
default_route_config_);
balancer_->ads_service()->SetEdsResource(BuildEdsResource(
EdsResourceArgs({{"locality0",
{CreateEndpoint(0, HealthStatus::HEALTHY),
CreateEndpoint(1, HealthStatus::HEALTHY)}}})));
WaitForAllBackends(DEBUG_LOCATION, 0, 2);
CheckRpcSendOk(DEBUG_LOCATION, 4);
EXPECT_EQ(2, backends_[0]->backend_service()->request_count());
EXPECT_EQ(2, backends_[1]->backend_service()->request_count());
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
ResetBackendCounters();
// Get a cookie for backends_[0].
auto session_cookie = GetAffinityCookieHeaderForBackend(DEBUG_LOCATION, 0);
ASSERT_FALSE(session_cookie.empty());
balancer_->ads_service()->SetEdsResource(BuildEdsResource(
EdsResourceArgs({{"locality0",
{CreateEndpoint(0, HealthStatus::DRAINING),
CreateEndpoint(1, HealthStatus::HEALTHY),
CreateEndpoint(2, HealthStatus::UNKNOWN)}}})));
WaitForAllBackends(DEBUG_LOCATION, 2);
// Override for the draining host is not honored, RR is used instead.
CheckRpcSendOk(DEBUG_LOCATION, 4, RpcOptions().set_metadata(session_cookie));
EXPECT_EQ(0, backends_[0]->backend_service()->request_count());
EXPECT_EQ(2, backends_[1]->backend_service()->request_count());
EXPECT_EQ(2, backends_[2]->backend_service()->request_count());
ResetBackendCounters();
}
} // namespace
} // namespace testing

Loading…
Cancel
Save