xds/interop: basic header based affinity test (#26759)

* affinity test

- most basic affinity test
  - verify that the received RDS and CDS are correctly configured for affinity
  - verify that all RPCs are only sent to the one backend
  - verify that only one sub-channel is connected, the other 2 are IDLE

And infra changes:
- add argument to set affinity config when creating backend service
- add a new backend service "affinity" to be shared by all affinity test
  - this backend service is configured to do header affinity
  - it has 3 endpoints
- replica support copied from PR https://github.com/grpc/grpc/pull/26360
- update backend services from GRPC to HTTP2, to disable validate-for-proxyless
  - this will be reverted later
- add channelz function to query subchannels
- add method to configure the initial RPC config (RPC types and RPC metadata) when creating the client
- set env var to enable RING_HASH support

* c1

* REVERT THIS: update strategy to trigger a manual build

* config: suffix to prefix

* Revert "REVERT THIS: update strategy to trigger a manual build"

This reverts commit 830776fef9.
pull/26566/head^2
Menghan Li 4 years ago committed by GitHub
parent 3104a9964c
commit ee17927fce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      tools/run_tests/xds_k8s_test_driver/config/url-map.cfg
  2. 25
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py
  3. 75
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  4. 15
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
  5. 75
      tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py
  6. 48
      tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py
  7. 32
      tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py
  8. 3
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/client.deployment.yaml
  9. 107
      tools/run_tests/xds_k8s_test_driver/tests/url_map/affinity_test.py

@ -1,4 +1,4 @@
--resource_suffix=interop-psm-url-map
--resource_prefix=interop-psm-url-map
--strategy=reuse
# TODO(lidiz): Remove the next line when xds port randomization supported.
--server_xds_port=8848

@ -112,18 +112,25 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
self,
name: str,
health_check: GcpResource,
affinity_header: str = None,
protocol: Optional[BackendServiceProtocol] = None) -> GcpResource:
if not isinstance(protocol, self.BackendServiceProtocol):
raise TypeError(f'Unexpected Backend Service protocol: {protocol}')
return self._insert_resource(
self.api.backendServices(),
{
'name': name,
'loadBalancingScheme':
'INTERNAL_SELF_MANAGED', # Traffic Director
'healthChecks': [health_check.url],
'protocol': protocol.name,
})
body = {
'name': name,
'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', # Traffic Director
'healthChecks': [health_check.url],
'protocol': protocol.name,
}
# If affinity header is specified, config the backend service to support
# affinity, and set affinity header to the one given.
if affinity_header:
body['sessionAffinity'] = 'HEADER_FIELD'
body['localityLbPolicy'] = 'RING_HASH'
body['consistentHash'] = {
'httpHeaderName': affinity_header,
}
return self._insert_resource(self.api.backendServices(), body)
def get_backend_service_traffic_director(self, name: str) -> GcpResource:
return self._get_resource(self.api.backendServices(),

@ -40,6 +40,9 @@ ClientTlsPolicy = _NetworkSecurityV1Alpha1.ClientTlsPolicy
_NetworkServicesV1Alpha1 = gcp.network_services.NetworkServicesV1Alpha1
EndpointConfigSelector = _NetworkServicesV1Alpha1.EndpointConfigSelector
# Testing metadata consts
TEST_AFFINITY_METADATA_KEY = 'xds_md'
class TrafficDirectorManager:
compute: _ComputeV1
@ -48,6 +51,7 @@ class TrafficDirectorManager:
BACKEND_SERVICE_NAME = "backend-service"
ALTERNATIVE_BACKEND_SERVICE_NAME = "backend-service-alt"
AFFINITY_BACKEND_SERVICE_NAME = "backend-service-affinity"
HEALTH_CHECK_NAME = "health-check"
URL_MAP_NAME = "url-map"
URL_MAP_PATH_MATCHER_NAME = "path-matcher"
@ -90,6 +94,11 @@ class TrafficDirectorManager:
self.alternative_backend_service_protocol: Optional[
BackendServiceProtocol] = None
self.alternative_backends: Set[ZonalGcpResource] = set()
self.affinity_backend_service: Optional[GcpResource] = None
# TODO(sergiitk): remove this flag once backend service resource loaded
self.affinity_backend_service_protocol: Optional[
BackendServiceProtocol] = None
self.affinity_backends: Set[ZonalGcpResource] = set()
@property
def network_url(self):
@ -122,13 +131,12 @@ class TrafficDirectorManager:
def cleanup(self, *, force=False):
# Cleanup in the reverse order of creation
self.delete_forwarding_rule(force=force)
if self.target_proxy_is_http:
self.delete_target_http_proxy(force=force)
else:
self.delete_target_grpc_proxy(force=force)
self.delete_target_http_proxy(force=force)
self.delete_target_grpc_proxy(force=force)
self.delete_url_map(force=force)
self.delete_backend_service(force=force)
self.delete_alternative_backend_service(force=force)
self.delete_affinity_backend_service(force=force)
self.delete_health_check(force=force)
@functools.lru_cache(None)
@ -280,6 +288,65 @@ class TrafficDirectorManager:
self.compute.wait_for_backends_healthy_status(
self.alternative_backend_service, self.alternative_backends)
def create_affinity_backend_service(
self, protocol: Optional[BackendServiceProtocol] = _BackendGRPC):
if protocol is None:
protocol = _BackendGRPC
name = self.make_resource_name(self.AFFINITY_BACKEND_SERVICE_NAME)
logger.info('Creating %s Affinity Backend Service "%s"', protocol.name,
name)
resource = self.compute.create_backend_service_traffic_director(
name,
health_check=self.health_check,
protocol=protocol,
affinity_header=TEST_AFFINITY_METADATA_KEY)
self.affinity_backend_service = resource
self.affinity_backend_service_protocol = protocol
def load_affinity_backend_service(self):
name = self.make_resource_name(self.AFFINITY_BACKEND_SERVICE_NAME)
resource = self.compute.get_backend_service_traffic_director(name)
self.affinity_backend_service = resource
def delete_affinity_backend_service(self, force=False):
if force:
name = self.make_resource_name(self.AFFINITY_BACKEND_SERVICE_NAME)
elif self.affinity_backend_service:
name = self.affinity_backend_service.name
else:
return
logger.info('Deleting Affinity Backend Service "%s"', name)
self.compute.delete_backend_service(name)
self.affinity_backend_service = None
def affinity_backend_service_add_neg_backends(self, name, zones):
logger.info('Waiting for Network Endpoint Groups to load endpoints.')
for zone in zones:
backend = self.compute.wait_for_network_endpoint_group(name, zone)
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.affinity_backends.add(backend)
self.affinity_backend_service_add_backends()
def affinity_backend_service_add_backends(self):
logging.info('Adding backends to Backend Service %s: %r',
self.affinity_backend_service.name, self.affinity_backends)
self.compute.backend_service_add_backends(self.affinity_backend_service,
self.affinity_backends)
def affinity_backend_service_remove_all_backends(self):
logging.info('Removing backends from Backend Service %s',
self.affinity_backend_service.name)
self.compute.backend_service_remove_all_backends(
self.affinity_backend_service)
def wait_for_affinity_backends_healthy_status(self):
logger.debug(
"Waiting for Backend Service %s to report all backends healthy %r",
self.affinity_backend_service, self.affinity_backends)
self.compute.wait_for_backends_healthy_status(
self.affinity_backend_service, self.affinity_backends)
def create_url_map(
self,
src_host: str,

@ -20,7 +20,7 @@ modules.
import datetime
import functools
import logging
from typing import Iterable, Optional, Tuple
from typing import Iterable, Optional, Tuple, List
from framework.helpers import retryers
from framework.infrastructure import gcp
@ -213,6 +213,17 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
f'Not found a {_ChannelzChannelState.Name(state)} '
f'subchannel for channel_id {channel.ref.channel_id}')
def find_subchannels_with_state(self, state: _ChannelzChannelState,
**kwargs) -> List[_ChannelzSubchannel]:
subchannels = []
for channel in self.channelz.find_channels_for_target(
self.server_target, **kwargs):
for subchannel in self.channelz.list_channel_subchannels(
channel, **kwargs):
if subchannel.data.state.state is state:
subchannels.append(subchannel)
return subchannels
class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
@ -266,6 +277,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
server_target,
rpc='UnaryCall',
qps=25,
metadata='',
secure_mode=False,
print_response=False) -> XdsTestClient:
super().run()
@ -299,6 +311,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
server_target=server_target,
rpc=rpc,
qps=qps,
metadata=metadata,
secure_mode=secure_mode,
print_response=print_response)

@ -19,6 +19,7 @@ modules.
"""
import functools
import logging
import threading
from typing import Iterator, Optional
from framework.infrastructure import gcp
@ -48,7 +49,8 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp):
server_id: Optional[str] = None,
xds_host: Optional[str] = None,
xds_port: Optional[int] = None,
rpc_host: Optional[str] = None):
rpc_host: Optional[str] = None,
pod_name: Optional[str] = None):
super().__init__(rpc_host=(rpc_host or ip))
self.ip = ip
self.rpc_port = rpc_port
@ -56,6 +58,7 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp):
self.secure_mode = secure_mode
self.server_id = server_id
self.xds_host, self.xds_port = xds_host, xds_port
self.pod_name = pod_name
@property
@functools.lru_cache(None)
@ -131,6 +134,9 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
DEFAULT_MAINTENANCE_PORT = 8080
DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081
_lock = threading.Lock()
_server_port_forwarding_offset = 0
def __init__(self,
k8s_namespace,
*,
@ -184,7 +190,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
self.deployment: Optional[k8s.V1Deployment] = None
self.service_account: Optional[k8s.V1ServiceAccount] = None
self.service: Optional[k8s.V1Service] = None
self.port_forwarder = None
self.port_forwarders = []
def run(self,
*,
@ -192,11 +198,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
maintenance_port=None,
secure_mode=False,
server_id=None,
replica_count=1) -> XdsTestServer:
# TODO(sergiitk): multiple replicas
if replica_count != 1:
raise NotImplementedError("Multiple replicas not yet supported")
replica_count=1) -> Iterator[XdsTestServer]:
# Implementation detail: in secure mode, maintenance ("backchannel")
# port must be different from the test port so communication with
# maintenance services can be reached independently from the security
@ -267,32 +269,45 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
# Wait for pods running
pods = self.k8s_namespace.list_deployment_pods(self.deployment)
servers = []
for pod in pods:
self._wait_pod_started(pod.metadata.name)
# TODO(sergiitk): This is why multiple replicas not yet supported
pod = pods[0]
pod_ip = pod.status.pod_ip
rpc_host = None
# Experimental, for local debugging.
if self.debug_use_port_forwarding:
logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
pod_ip, maintenance_port)
self.port_forwarder = self.k8s_namespace.port_forward_pod(
pod, remote_port=maintenance_port)
rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS
return XdsTestServer(ip=pod_ip,
rpc_port=test_port,
maintenance_port=maintenance_port,
secure_mode=secure_mode,
server_id=server_id,
rpc_host=rpc_host)
pod_name = pod.metadata.name
self._wait_pod_started(pod_name)
pod_ip = pod.status.pod_ip
rpc_host = None
# Experimental, for local debugging.
local_port = maintenance_port
if self.debug_use_port_forwarding:
with KubernetesServerRunner._lock:
local_port = maintenance_port + KubernetesServerRunner._server_port_forwarding_offset
KubernetesServerRunner._server_port_forwarding_offset += 1
logger.info(
'LOCAL DEV MODE: Enabling port forwarding to %s:%s using local port %s',
pod_ip, maintenance_port, local_port)
self.port_forwarders.append(
self.k8s_namespace.port_forward_pod(
pod,
remote_port=maintenance_port,
local_port=local_port))
rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS
servers.append(
XdsTestServer(ip=pod_ip,
rpc_port=test_port,
maintenance_port=local_port,
secure_mode=secure_mode,
server_id=server_id,
rpc_host=rpc_host,
pod_name=pod_name))
return servers
def cleanup(self, *, force=False, force_namespace=False):
if self.port_forwarder:
self.k8s_namespace.port_forward_stop(self.port_forwarder)
self.port_forwarder = None
if self.port_forwarders:
for port_forwarder in self.port_forwarders:
self.k8s_namespace.port_forward_stop(port_forwarder)
self.port_forwarders = []
if self.deployment or force:
self._delete_deployment(self.deployment_name)
self.deployment = None

@ -41,6 +41,7 @@ UrlMapType = Any
HostRule = Any
PathMatcher = Any
_BackendHTTP2 = gcp.compute.ComputeV1.BackendServiceProtocol.HTTP2
_COMPUTE_V1_URL_PREFIX = 'https://www.googleapis.com/compute/v1'
@ -185,6 +186,16 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
td_bootstrap_image=self.td_bootstrap_image,
network=self.network,
reuse_namespace=True)
self.test_server_affinity_runner = server_app.KubernetesServerRunner(
self.k8s_namespace,
deployment_name=self.server_name + '-affinity',
image_name=self.server_image,
gcp_project=self.project,
gcp_api_manager=self.gcp_api_manager,
gcp_service_account=self.gcp_service_account,
td_bootstrap_image=self.td_bootstrap_image,
network=self.network,
reuse_namespace=True)
logging.info('Strategy of GCP resources management: %s', self.strategy)
def _pre_cleanup(self):
@ -209,8 +220,16 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
# Health Checks
self.td.create_health_check()
# Backend Services
self.td.create_backend_service()
self.td.create_alternative_backend_service()
#
# The backend services are created with HTTP2 instead of GRPC (the
# default) to disable validate-for-proxyless, because the affinity
# settings are not accepted by RCTH yet.
#
# TODO: delete _BackendHTTP2 from the parameters, to use the default
# GRPC.
self.td.create_backend_service(_BackendHTTP2)
self.td.create_alternative_backend_service(_BackendHTTP2)
self.td.create_affinity_backend_service(_BackendHTTP2)
# Construct UrlMap from test classes
aggregator = _UrlMapChangeAggregator(
url_map_name=self.td.make_resource_name(self.td.URL_MAP_NAME))
@ -231,18 +250,30 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
self.test_server_alternative_runner.run(
test_port=self.server_port,
maintenance_port=self.server_maintenance_port)
# Kubernetes Test Server Affinity. 3 endpoints to test that only the
# picked sub-channel is connected.
self.test_server_affinity_runner.run(
test_port=self.server_port,
maintenance_port=self.server_maintenance_port,
replica_count=3)
# Add backend to default backend service
neg_name, neg_zones = self.k8s_namespace.get_service_neg(
self.test_server_runner.service_name, self.server_port)
self.td.backend_service_add_neg_backends(neg_name, neg_zones)
# Add backend to alternative backend service
neg_name, neg_zones = self.k8s_namespace.get_service_neg(
neg_name_alt, neg_zones_alt = self.k8s_namespace.get_service_neg(
self.test_server_alternative_runner.service_name, self.server_port)
self.td.alternative_backend_service_add_neg_backends(
neg_name, neg_zones)
neg_name_alt, neg_zones_alt)
# Add backend to affinity backend service
neg_name_affinity, neg_zones_affinity = self.k8s_namespace.get_service_neg(
self.test_server_affinity_runner.service_name, self.server_port)
self.td.affinity_backend_service_add_neg_backends(
neg_name_affinity, neg_zones_affinity)
# Wait for healthy backends
self.td.wait_for_backends_healthy_status()
self.td.wait_for_alternative_backends_healthy_status()
self.td.wait_for_affinity_backends_healthy_status()
def cleanup(self) -> None:
if self.strategy not in ['create']:
@ -260,6 +291,9 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
if hasattr(self, 'test_server_alternative_runner'):
self.test_server_alternative_runner.cleanup(force=True,
force_namespace=True)
if hasattr(self, 'test_server_affinity_runner'):
self.test_server_affinity_runner.cleanup(force=True,
force_namespace=True)
@functools.lru_cache(None)
def default_backend_service(self) -> str:
@ -272,3 +306,9 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
"""Returns alternative backend service URL."""
self.td.load_alternative_backend_service()
return self.td.alternative_backend_service.url
@functools.lru_cache(None)
def affinity_backend_service(self) -> str:
"""Returns affinity backend service URL."""
self.td.load_affinity_backend_service()
return self.td.affinity_backend_service.url

@ -134,6 +134,7 @@ class RpcDistributionStats:
def __init__(self, json_lb_stats: JsonType):
self.num_failures = json_lb_stats.get('numFailures', 0)
self.num_peers = 0
self.num_oks = 0
self.default_service_rpc_count = 0
self.alternative_service_rpc_count = 0
@ -142,6 +143,8 @@ class RpcDistributionStats:
self.unary_call_alternative_service_rpc_count = 0
self.empty_call_alternative_service_rpc_count = 0
if 'rpcsByPeer' in json_lb_stats:
self.num_peers = len(json_lb_stats['rpcsByPeer'])
if 'rpcsByMethod' in json_lb_stats:
for rpc_type in json_lb_stats['rpcsByMethod']:
for peer in json_lb_stats['rpcsByMethod'][rpc_type][
@ -217,6 +220,28 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
- rpc_distribution_validate: Validates if the routing behavior is correct
"""
@staticmethod
def client_init_config(rpc: str, metadata: str) -> Tuple[str, str]:
"""Updates the initial RPC configs for this test case.
Each test case will start a test client. The client takes RPC configs
and starts to send RPCs immediately. The config returned by this
function will be used to replace the default configs.
The default configs are passed in as arguments, so this method can
modify part of them.
Args:
rpc: The default rpc config, specifying RPCs to send, format
'UnaryCall,EmptyCall'
metadata: The metadata config, specifying metadata to send with each
RPC, format 'EmptyCall:key1:value1,UnaryCall:key2:value2'.
Returns:
A tuple contains the updated rpc and metadata config.
"""
return rpc, metadata
@staticmethod
@abc.abstractmethod
def url_map_change(
@ -278,10 +303,13 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
cls.started_test_cases.add(cls.__name__)
# TODO(lidiz) concurrency is possible, pending multiple-instance change
GcpResourceManager().test_client_runner.cleanup(force=True)
# Sending both RPCs when starting.
# Start the client, and allow the test to override the initial RPC config.
rpc, metadata = cls.client_init_config(rpc="UnaryCall,EmptyCall",
metadata="")
cls.test_client = GcpResourceManager().test_client_runner.run(
server_target=f'xds:///{cls.hostname()}',
rpc='UnaryCall,EmptyCall',
rpc=rpc,
metadata=metadata,
qps=QPS.value,
print_response=True)

@ -28,12 +28,15 @@ spec:
- "--stats_port=${stats_port}"
- "--qps=${qps}"
- "--rpc=${rpc}"
- "--metadata=${metadata}"
- "--print_response=${print_response}"
ports:
- containerPort: ${stats_port}
env:
- name: GRPC_XDS_BOOTSTRAP
value: "/tmp/grpc-xds/td-grpc-bootstrap.json"
- name: GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH
value: "true"
volumeMounts:
- mountPath: /tmp/grpc-xds/
name: grpc-td-conf

@ -0,0 +1,107 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Tuple
from absl import flags
from framework import xds_url_map_testcase
from framework.rpc import grpc_channelz
from framework.test_app import client_app
from framework.infrastructure import traffic_director
# Type aliases
HostRule = xds_url_map_testcase.HostRule
PathMatcher = xds_url_map_testcase.PathMatcher
GcpResourceManager = xds_url_map_testcase.GcpResourceManager
DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
XdsTestClient = client_app.XdsTestClient
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_url_map_testcase)
_NUM_RPCS = 150
_TEST_METADATA_KEY = traffic_director.TEST_AFFINITY_METADATA_KEY
_TEST_METADATA_VALUE_UNARY = 'unary_yranu'
_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric'
_TEST_METADATA_NUMERIC_VALUE = '159'
_TEST_METADATA = (
(RpcTypeUnaryCall, _TEST_METADATA_KEY, _TEST_METADATA_VALUE_UNARY),
(RpcTypeEmptyCall, _TEST_METADATA_KEY, _TEST_METADATA_VALUE_EMPTY),
(RpcTypeUnaryCall, _TEST_METADATA_NUMERIC_KEY,
_TEST_METADATA_NUMERIC_VALUE),
)
_ChannelzChannelState = grpc_channelz.ChannelState
class TestHeaderBasedAffinity(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def client_init_config(rpc: str, metadata: str):
# Config the init RPCs to send with the same set of metadata. Without
# this, the init RPCs will not have headers, and will pick random
# backends (behavior of RING_HASH). This is necessary to only one
# sub-channel is picked and used from the beginning, thus the channel
# will only create this one sub-channel.
return 'EmptyCall', 'EmptyCall:%s:%s' % (_TEST_METADATA_KEY,
_TEST_METADATA_VALUE_EMPTY)
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
# Update default service to the affinity service.
path_matcher["defaultService"] = GcpResourceManager(
).affinity_backend_service()
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
# 3 endpoints in the affinity backend service.
self.assertNumEndpoints(xds_config, 3)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['route']
['hashPolicy'][0]['header']['headerName'], _TEST_METADATA_KEY)
self.assertEqual(xds_config.cds[0]['lbPolicy'], 'RING_HASH')
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeEmptyCall],
metadata=_TEST_METADATA,
num_rpcs=_NUM_RPCS)
# Only one backend should receive traffic, even though there are 3
# backends.
self.assertEqual(1, rpc_distribution.num_peers)
self.assertLen(
test_client.find_subchannels_with_state(
_ChannelzChannelState.READY),
1,
)
self.assertLen(
test_client.find_subchannels_with_state(_ChannelzChannelState.IDLE),
2,
)
# TODO: add more test cases
# 1. based on the basic test, turn down the backend in use, then verify that all
# RPCs are sent to another backend
# 2. based on the basic test, send more RPCs with other metadata, then verify
# that they can pick another backend, and there are total of two READY
# sub-channels
Loading…
Cancel
Save