xDS interop: Replace pod_name with hostname (#30643)

pod_name shouldn't be a part of the test app, it's purely k8s' idiom.
Originally server_id was intended for this purpose, but it was missed
when support for multiple server replicas added.

This replaces pod_name and server_id with hostname and improves
replica-specific log messages, so it's clear to what server
RPCs are issued.

In addition, now all RPC logs are annotated with the hostname:port,
so the destination is clear.

Before:

```
server_app.py:76] Setting health status to serving
grpc.py:60] RPC XdsUpdateHealthService.SetServing(request=Empty({}), timeout=90, wait_for_ready=True)
grpc.py:60] RPC Health.Check(request=HealthCheckRequest({}), timeout=90, wait_for_ready=True)
server_app.py:78] Server reports status: SERVING
```

After:
```
server_app.py:89] [psm-grpc-server-69bcf749c5-bg4x5] Setting health status to NOT_SERVING
grpc.py:72] [psm-grpc-server-69bcf749c5-bg4x5:52902] RPC XdsUpdateHealthService.SetNotServing(request=Empty({}), timeout=90, wait_for_ready=True)
grpc.py:72] [psm-grpc-server-69bcf749c5-bg4x5:52902] RPC Health.Check(request=HealthCheckRequest({}), timeout=90, wait_for_ready=True)
server_app.py:92] [psm-grpc-server-69bcf749c5-bg4x5] Health status status: NOT_SERVING
```

Similarly, this adds hostname to the client app, mainly for logging.
pull/30665/head
Sergii Tkachenko 2 years ago committed by GitHub
parent 221a969e04
commit 74bd2d8360
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py
  2. 24
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py
  3. 9
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py
  4. 8
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_csds.py
  5. 29
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py
  6. 60
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
  7. 3
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py
  8. 11
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py
  9. 56
      tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py
  10. 22
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  11. 6
      tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py
  12. 8
      tools/run_tests/xds_k8s_test_driver/tests/affinity_test.py
  13. 2
      tools/run_tests/xds_k8s_test_driver/tests/outlier_detection_test.py
  14. 10
      tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py

@ -81,10 +81,9 @@ def debug_sock_tls(tls):
f'remote: {debug_cert(tls.remote_certificate)}')
def get_deployment_pod_ips(k8s_ns, deployment_name):
def get_deployment_pods(k8s_ns, deployment_name):
deployment = k8s_ns.get_deployment(deployment_name)
pods = k8s_ns.list_deployment_pods(deployment)
return [pod.status.pod_ip for pod in pods]
return k8s_ns.list_deployment_pods(deployment)
def debug_security_setup_negative(test_client):
@ -186,10 +185,11 @@ def main(argv):
server_name = xds_flags.SERVER_NAME.value
server_namespace = resource_prefix
server_k8s_ns = k8s.KubernetesNamespace(k8s_api_manager, server_namespace)
server_pod_ip = get_deployment_pod_ips(server_k8s_ns, server_name)[0]
server_pod = get_deployment_pods(server_k8s_ns, server_name)[0]
test_server: _XdsTestServer = _XdsTestServer(
ip=server_pod_ip,
ip=server_pod.status.pod_ip,
rpc_port=xds_flags.SERVER_PORT.value,
hostname=server_pod.metadata.name,
xds_host=xds_flags.SERVER_XDS_HOST.value,
xds_port=xds_flags.SERVER_XDS_PORT.value,
rpc_host=_SERVER_RPC_HOST.value)
@ -198,11 +198,12 @@ def main(argv):
client_name = xds_flags.CLIENT_NAME.value
client_namespace = resource_prefix
client_k8s_ns = k8s.KubernetesNamespace(k8s_api_manager, client_namespace)
client_pod_ip = get_deployment_pod_ips(client_k8s_ns, client_name)[0]
client_pod = get_deployment_pods(client_k8s_ns, client_name)[0]
test_client: _XdsTestClient = _XdsTestClient(
ip=client_pod_ip,
server_target=test_server.xds_uri,
ip=client_pod.status.pod_ip,
rpc_port=xds_flags.CLIENT_PORT.value,
server_target=test_server.xds_uri,
hostname=client_pod.metadata.name,
rpc_host=_CLIENT_RPC_HOST.value)
if _SECURITY.value in ('mtls', 'tls', 'plaintext'):

@ -26,15 +26,26 @@ Message = google.protobuf.message.Message
class GrpcClientHelper:
channel: grpc.Channel
DEFAULT_RPC_DEADLINE_SEC = 90
def __init__(self, channel: grpc.Channel, stub_class: Any):
channel: grpc.Channel
# This is purely cosmetic to make RPC logs look like method calls.
log_service_name: str
# This is purely cosmetic to output the RPC target. Normally set to the
# hostname:port of the remote service, but it doesn't have to be the
# real target. This is done so that when RPC are routed to the proxy
# or port forwarding, this still is set to a useful name.
log_target: str
def __init__(self,
channel: grpc.Channel,
stub_class: Any,
*,
log_target: Optional[str] = ''):
self.channel = channel
self.stub = stub_class(channel)
# This is purely cosmetic to make RPC logs look like method calls.
self.log_service_name = re.sub('Stub$', '',
self.stub.__class__.__name__)
self.log_target = log_target or ''
def call_unary_with_deadline(
self,
@ -55,8 +66,9 @@ class GrpcClientHelper:
def _log_rpc_request(self, rpc, req, call_kwargs, log_level=logging.DEBUG):
logger.log(logging.DEBUG if log_level is None else log_level,
'RPC %s.%s(request=%s(%r), %s)', self.log_service_name, rpc,
req.__class__.__name__, json_format.MessageToDict(req),
'[%s] RPC %s.%s(request=%s(%r), %s)', self.log_target,
self.log_service_name, rpc, req.__class__.__name__,
json_format.MessageToDict(req),
', '.join({f'{k}={v}' for k, v in call_kwargs.items()}))

@ -57,8 +57,13 @@ _GetServerSocketsResponse = channelz_pb2.GetServerSocketsResponse
class ChannelzServiceClient(framework.rpc.grpc.GrpcClientHelper):
stub: channelz_pb2_grpc.ChannelzStub
def __init__(self, channel: grpc.Channel):
super().__init__(channel, channelz_pb2_grpc.ChannelzStub)
def __init__(self,
channel: grpc.Channel,
*,
log_target: Optional[str] = ''):
super().__init__(channel,
channelz_pb2_grpc.ChannelzStub,
log_target=log_target)
@staticmethod
def is_sock_tcpip_address(address: Address):

@ -44,9 +44,13 @@ _ClientStatusRequest = csds_pb2.ClientStatusRequest
class CsdsClient(framework.rpc.grpc.GrpcClientHelper):
stub: csds_pb2_grpc.ClientStatusDiscoveryServiceStub
def __init__(self, channel: grpc.Channel):
def __init__(self,
channel: grpc.Channel,
*,
log_target: Optional[str] = ''):
super().__init__(channel,
csds_pb2_grpc.ClientStatusDiscoveryServiceStub)
csds_pb2_grpc.ClientStatusDiscoveryServiceStub,
log_target=log_target)
def fetch_client_status(self, **kwargs) -> Optional[ClientConfig]:
"""Fetches the active xDS configurations."""

@ -39,8 +39,13 @@ class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper):
STATS_PARTIAL_RESULTS_TIMEOUT_SEC = 1200
STATS_ACCUMULATED_RESULTS_TIMEOUT_SEC = 600
def __init__(self, channel: grpc.Channel):
super().__init__(channel, test_pb2_grpc.LoadBalancerStatsServiceStub)
def __init__(self,
channel: grpc.Channel,
*,
log_target: Optional[str] = ''):
super().__init__(channel,
test_pb2_grpc.LoadBalancerStatsServiceStub,
log_target=log_target)
def get_client_stats(
self,
@ -78,9 +83,13 @@ class XdsUpdateClientConfigureServiceClient(framework.rpc.grpc.GrpcClientHelper
stub: test_pb2_grpc.XdsUpdateClientConfigureServiceStub
CONFIGURE_TIMEOUT_SEC: int = 5
def __init__(self, channel: grpc.Channel):
def __init__(self,
channel: grpc.Channel,
*,
log_target: Optional[str] = ''):
super().__init__(channel,
test_pb2_grpc.XdsUpdateClientConfigureServiceStub)
test_pb2_grpc.XdsUpdateClientConfigureServiceStub,
log_target=log_target)
def configure(
self,
@ -115,8 +124,10 @@ class XdsUpdateClientConfigureServiceClient(framework.rpc.grpc.GrpcClientHelper
class XdsUpdateHealthServiceClient(framework.rpc.grpc.GrpcClientHelper):
stub: test_pb2_grpc.XdsUpdateHealthServiceStub
def __init__(self, channel: grpc.Channel):
super().__init__(channel, test_pb2_grpc.XdsUpdateHealthServiceStub)
def __init__(self, channel: grpc.Channel, log_target: Optional[str] = ''):
super().__init__(channel,
test_pb2_grpc.XdsUpdateHealthServiceStub,
log_target=log_target)
def set_serving(self):
self.call_unary_with_deadline(rpc='SetServing',
@ -132,8 +143,10 @@ class XdsUpdateHealthServiceClient(framework.rpc.grpc.GrpcClientHelper):
class HealthClient(framework.rpc.grpc.GrpcClientHelper):
stub: health_pb2_grpc.HealthStub
def __init__(self, channel: grpc.Channel):
super().__init__(channel, health_pb2_grpc.HealthStub)
def __init__(self, channel: grpc.Channel, log_target: Optional[str] = ''):
super().__init__(channel,
health_pb2_grpc.HealthStub,
log_target=log_target)
def check_health(self):
return self.call_unary_with_deadline(

@ -44,12 +44,15 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
Represents RPC services implemented in Client component of the xds test app.
https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#client
"""
# A unique string identifying each client replica. Used in logging.
hostname: str
def __init__(self,
*,
ip: str,
rpc_port: int,
server_target: str,
hostname: str,
rpc_host: Optional[str] = None,
maintenance_port: Optional[int] = None):
super().__init__(rpc_host=(rpc_host or ip))
@ -57,28 +60,35 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
self.rpc_port = rpc_port
self.server_target = server_target
self.maintenance_port = maintenance_port or rpc_port
self.hostname = hostname
@property
@functools.lru_cache(None)
def load_balancer_stats(self) -> _LoadBalancerStatsServiceClient:
return _LoadBalancerStatsServiceClient(self._make_channel(
self.rpc_port))
return _LoadBalancerStatsServiceClient(
self._make_channel(self.rpc_port),
log_target=f'{self.hostname}:{self.rpc_port}')
@property
@functools.lru_cache(None)
def update_config(self):
return _XdsUpdateClientConfigureServiceClient(
self._make_channel(self.rpc_port))
self._make_channel(self.rpc_port),
log_target=f'{self.hostname}:{self.rpc_port}')
@property
@functools.lru_cache(None)
def channelz(self) -> _ChannelzServiceClient:
return _ChannelzServiceClient(self._make_channel(self.maintenance_port))
return _ChannelzServiceClient(
self._make_channel(self.maintenance_port),
log_target=f'{self.hostname}:{self.maintenance_port}')
@property
@functools.lru_cache(None)
def csds(self) -> _CsdsClient:
return _CsdsClient(self._make_channel(self.maintenance_port))
return _CsdsClient(
self._make_channel(self.maintenance_port),
log_target=f'{self.hostname}:{self.maintenance_port}')
def get_load_balancer_stats(
self,
@ -114,19 +124,22 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
_ChannelzChannelState.READY)
# Get the first subchannel of the active channel to the server.
logger.debug(
'Retrieving client -> server socket, '
'channel_id: %s, subchannel: %s', channel.ref.channel_id,
channel.subchannel_ref[0].name)
'[%s] Retrieving client -> server socket, '
'channel_id: %s, subchannel: %s', self.hostname,
channel.ref.channel_id, channel.subchannel_ref[0].name)
subchannel, *subchannels = list(
self.channelz.list_channel_subchannels(channel))
if subchannels:
logger.warning('Unexpected subchannels: %r', subchannels)
logger.warning('[%s] Unexpected subchannels: %r', self.hostname,
subchannels)
# Get the first socket of the subchannel
socket, *sockets = list(
self.channelz.list_subchannels_sockets(subchannel))
if sockets:
logger.warning('Unexpected sockets: %r', subchannels)
logger.debug('Found client -> server socket: %s', socket.ref.name)
logger.warning('[%s] Unexpected sockets: %r', self.hostname,
subchannels)
logger.debug('[%s] Found client -> server socket: %s', self.hostname,
socket.ref.name)
return socket
def wait_for_server_channel_state(
@ -146,14 +159,13 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
wait_max=_timedelta(seconds=25),
timeout=_timedelta(minutes=5) if timeout is None else timeout)
logger.info('Waiting for client %s to report a %s channel to %s',
self.ip, _ChannelzChannelState.Name(state),
self.server_target)
logger.info('[%s] Waiting to report a %s channel to %s', self.hostname,
_ChannelzChannelState.Name(state), self.server_target)
channel = retryer(self.find_server_channel_with_state,
state,
rpc_deadline=rpc_deadline)
logger.info('Client %s channel to %s transitioned to state %s:\n%s',
self.ip, self.server_target,
logger.info('[%s] Channel to %s transitioned to state %s:\n%s',
self.hostname, self.server_target,
_ChannelzChannelState.Name(state), channel)
return channel
@ -169,7 +181,8 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
for channel in self.get_server_channels(**rpc_params):
channel_state: _ChannelzChannelState = channel.data.state.state
logger.info('Server channel: %s, state: %s', channel.ref.name,
logger.info('[%s] Server channel: %s, state: %s', self.hostname,
channel.ref.name,
_ChannelzChannelState.Name(channel_state))
if channel_state is state:
if check_subchannel:
@ -178,7 +191,8 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
try:
subchannel = self.find_subchannel_with_state(
channel, state, **rpc_params)
logger.info('Found subchannel in state %s: %s',
logger.info('[%s] Found subchannel in state %s: %s',
self.hostname,
_ChannelzChannelState.Name(state),
subchannel)
except self.NotFound as e:
@ -188,8 +202,8 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
return channel
raise self.NotFound(
f'Client has no {_ChannelzChannelState.Name(state)} channel with '
'the server')
f'[{self.hostname}] Client has no '
f'{_ChannelzChannelState.Name(state)} channel with the server')
def get_server_channels(self, **kwargs) -> Iterable[_ChannelzChannel]:
return self.channelz.find_channels_for_target(self.server_target,
@ -203,9 +217,9 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
if subchannel.data.state.state is state:
return subchannel
raise self.NotFound(
f'Not found a {_ChannelzChannelState.Name(state)} '
f'subchannel for channel_id {channel.ref.channel_id}')
raise self.NotFound(f'[{self.hostname}] Not found '
f'a {_ChannelzChannelState.Name(state)} subchannel '
f'for channel_id {channel.ref.channel_id}')
def find_subchannels_with_state(self, state: _ChannelzChannelState,
**kwargs) -> List[_ChannelzSubchannel]:

@ -161,7 +161,8 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
return XdsTestClient(ip=pod_ip,
rpc_port=rpc_port,
server_target=server_target,
rpc_host=rpc_host)
rpc_host=rpc_host,
hostname=pod.metadata.name)
def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
if self.port_forwarder:

@ -102,7 +102,6 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
test_port=DEFAULT_TEST_PORT,
maintenance_port=None,
secure_mode=False,
server_id=None,
replica_count=1) -> List[XdsTestServer]:
# Implementation detail: in secure mode, maintenance ("backchannel")
# port must be different from the test port so communication with
@ -127,9 +126,9 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
logger.info(
'Deploying xDS test server "%s" to k8s namespace %s: test_port=%s '
'maintenance_port=%s secure_mode=%s server_id=%s replica_count=%s',
'maintenance_port=%s secure_mode=%s replica_count=%s',
self.deployment_name, self.k8s_namespace.name, test_port,
maintenance_port, secure_mode, server_id, replica_count)
maintenance_port, secure_mode, replica_count)
self._logs_explorer_link(deployment_name=self.deployment_name,
namespace_name=self.k8s_namespace.name,
gcp_project=self.gcp_project,
@ -180,7 +179,6 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
replica_count=replica_count,
test_port=test_port,
maintenance_port=maintenance_port,
server_id=server_id,
secure_mode=secure_mode)
self._wait_deployment_with_available_replicas(self.deployment_name,
@ -210,11 +208,10 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
servers.append(
XdsTestServer(ip=pod_ip,
rpc_port=test_port,
hostname=pod_name,
maintenance_port=local_port,
secure_mode=secure_mode,
server_id=server_id,
rpc_host=rpc_host,
pod_name=pod_name))
rpc_host=rpc_host))
return servers
def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ

@ -35,52 +35,61 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp):
Represents RPC services implemented in Server component of the xDS test app.
https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server
"""
# A unique host name identifying each server replica.
# Server implementation must return this in the SimpleResponse.hostname,
# which client uses as the key in rpcs_by_peer map.
hostname: str
def __init__(self,
*,
ip: str,
rpc_port: int,
hostname: str,
maintenance_port: Optional[int] = None,
secure_mode: Optional[bool] = False,
server_id: Optional[str] = None,
xds_host: Optional[str] = None,
xds_port: Optional[int] = None,
rpc_host: Optional[str] = None,
pod_name: Optional[str] = None):
rpc_host: Optional[str] = None):
super().__init__(rpc_host=(rpc_host or ip))
self.ip = ip
self.rpc_port = rpc_port
self.hostname = hostname
self.maintenance_port = maintenance_port or rpc_port
self.secure_mode = secure_mode
self.server_id = server_id
self.xds_host, self.xds_port = xds_host, xds_port
self.pod_name = pod_name
@property
@functools.lru_cache(None)
def channelz(self) -> _ChannelzServiceClient:
return _ChannelzServiceClient(self._make_channel(self.maintenance_port))
return _ChannelzServiceClient(
self._make_channel(self.maintenance_port),
log_target=f'{self.hostname}:{self.maintenance_port}')
@property
@functools.lru_cache(None)
def update_health_service_client(self) -> _XdsUpdateHealthServiceClient:
return _XdsUpdateHealthServiceClient(
self._make_channel(self.maintenance_port))
self._make_channel(self.maintenance_port),
log_target=f'{self.hostname}:{self.maintenance_port}')
@property
@functools.lru_cache(None)
def health_client(self) -> _HealthClient:
return _HealthClient(self._make_channel(self.maintenance_port))
return _HealthClient(
self._make_channel(self.maintenance_port),
log_target=f'{self.hostname}:{self.maintenance_port}')
def set_serving(self):
logger.info('Setting health status to serving')
logger.info('[%s] Setting health status to SERVING', self.hostname)
self.update_health_service_client.set_serving()
logger.info('Server reports %s', self.health_client.check_health())
logger.info('[%s] Health status %s', self.hostname,
self.health_client.check_health())
def set_not_serving(self):
logger.info('Setting health status to not serving')
logger.info('[%s] Setting health status to NOT_SERVING', self.hostname)
self.update_health_service_client.set_not_serving()
logger.info('Server reports %s', self.health_client.check_health())
logger.info('[%s] Health status %s', self.hostname,
self.health_client.check_health())
def set_xds_address(self, xds_host, xds_port: Optional[int] = None):
self.xds_host, self.xds_port = xds_host, xds_port
@ -107,8 +116,8 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp):
"""
server = self.channelz.find_server_listening_on_port(self.rpc_port)
if not server:
raise self.NotFound(
f'Server listening on port {self.rpc_port} not found')
raise self.NotFound(f'[{self.hostname}] Server'
f'listening on port {self.rpc_port} not found')
return server
def get_test_server_sockets(self) -> Iterator[grpc_channelz.Socket]:
@ -131,16 +140,19 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp):
GrpcApp.NotFound: Server socket matching client socket not found.
"""
client_local = self.channelz.sock_address_to_str(client_socket.local)
logger.debug('Looking for a server socket connected to the client %s',
client_local)
logger.debug(
'[%s] Looking for a server socket connected '
'to the client %s', self.hostname, client_local)
server_socket = self.channelz.find_server_socket_matching_client(
self.get_test_server_sockets(), client_socket)
if not server_socket:
raise self.NotFound(
f'Server socket to client {client_local} not found')
logger.info('Found matching socket pair: server(%s) <-> client(%s)',
self.channelz.sock_addresses_pretty(server_socket),
self.channelz.sock_addresses_pretty(client_socket))
raise self.NotFound(f'[{self.hostname}] Socket '
f'to client {client_local} not found')
logger.info(
'[%s] Found matching socket pair: '
'server(%s) <-> client(%s)', self.hostname,
self.channelz.sock_addresses_pretty(server_socket),
self.channelz.sock_addresses_pretty(client_socket))
return server_socket

@ -247,11 +247,11 @@ class XdsKubernetesBaseTestCase(absltest.TestCase):
before_stats = test_client.get_load_balancer_accumulated_stats()
response_type = 'LoadBalancerAccumulatedStatsResponse'
logging.info('Received %s from test client %s: before:\n%s',
response_type, test_client.ip, before_stats)
response_type, test_client.hostname, before_stats)
time.sleep(duration.total_seconds())
after_stats = test_client.get_load_balancer_accumulated_stats()
logging.info('Received %s from test client %s: after:\n%s',
response_type, test_client.ip, after_stats)
response_type, test_client.hostname, after_stats)
diff_stats = self.diffAccumulatedStatsPerMethod(before_stats,
after_stats)
@ -283,20 +283,20 @@ class XdsKubernetesBaseTestCase(absltest.TestCase):
def _assertRpcsEventuallyGoToGivenServers(self, test_client: XdsTestClient,
servers: List[XdsTestServer],
num_rpcs: int):
server_names = [server.pod_name for server in servers]
logger.info('Verifying RPCs go to %s', server_names)
server_hostnames = [server.hostname for server in servers]
logger.info('Verifying RPCs go to servers %s', server_hostnames)
lb_stats = self.getClientRpcStats(test_client, num_rpcs)
failed = int(lb_stats.num_failures)
self.assertLessEqual(
failed,
0,
msg=f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed')
for server_name in server_names:
self.assertIn(server_name, lb_stats.rpcs_by_peer,
f'{server_name} did not receive RPCs')
for peer in lb_stats.rpcs_by_peer.keys():
self.assertIn(peer, server_names,
f'Unexpected server {peer} received RPCs')
for server_hostname in server_hostnames:
self.assertIn(server_hostname, lb_stats.rpcs_by_peer,
f'Server {server_hostname} did not receive RPCs')
for server_hostname in lb_stats.rpcs_by_peer.keys():
self.assertIn(server_hostname, server_hostnames,
f'Unexpected server {server_hostname} received RPCs')
def assertXdsConfigExists(self, test_client: XdsTestClient):
config = test_client.csds.fetch_client_status(log_level=logging.INFO)
@ -379,7 +379,7 @@ class XdsKubernetesBaseTestCase(absltest.TestCase):
lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs)
logger.info(
'Received LoadBalancerStatsResponse from test client %s:\n%s',
test_client.ip, lb_stats)
test_client.hostname, lb_stats)
return lb_stats
def assertAllBackendsReceivedRpcs(self, lb_stats):

@ -423,7 +423,7 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
test_client.get_load_balancer_stats(num_rpcs=num_rpcs))
logging.info(
'Received LoadBalancerStatsResponse from test client %s:\n%s',
test_client.ip, json.dumps(json_lb_stats, indent=2))
test_client.hostname, json.dumps(json_lb_stats, indent=2))
return RpcDistributionStats(json_lb_stats)
def assertNumEndpoints(self, xds_config: DumpedXdsConfig, k: int) -> None:
@ -441,12 +441,12 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
before_stats = test_client.get_load_balancer_accumulated_stats()
logging.info(
'Received LoadBalancerAccumulatedStatsResponse from test client %s: before:\n%s',
test_client.ip, before_stats)
test_client.hostname, before_stats)
time.sleep(length)
after_stats = test_client.get_load_balancer_accumulated_stats()
logging.info(
'Received LoadBalancerAccumulatedStatsResponse from test client %s: after: \n%s',
test_client.ip, after_stats)
test_client.hostname, after_stats)
# Validate the diff
for expected_result in expected:

@ -139,11 +139,9 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
rpc_distribution.raw['rpcsByPeer'].keys())[0]
with self.subTest('11_turn_down_server_in_use'):
for s in test_servers:
if s.pod_name == first_backend_inuse:
logging.info('setting backend %s to NOT_SERVING',
s.pod_name)
s.set_not_serving()
for server in test_servers:
if server.hostname == first_backend_inuse:
server.set_not_serving()
with self.subTest('12_wait_for_unhealth_status_propagation'):
deadline = time.time() + _TD_PROPAGATE_TIMEOUT

@ -115,7 +115,7 @@ class OutlierDetectionTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
rpc_types=rpc_types,
metadata=(
(RpcTypeUnaryCall, 'rpc-behavior',
f'hostname={test_servers[0].pod_name} error-code-2'),))
f'hostname={test_servers[0].hostname} error-code-2'),))
self.assertRpcsEventuallyGoToGivenServers(test_client,
test_servers[1:])

@ -74,12 +74,12 @@ class RoundRobinTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
self.assertEqual(total_requests_received, num_rpcs,
'Wrong number of RPCS')
for server in test_servers:
pod_name = server.pod_name
self.assertIn(pod_name, rpcs_by_peer,
f'pod {pod_name} did not receive RPCs')
hostname = server.hostname
self.assertIn(hostname, rpcs_by_peer,
f'Server {hostname} did not receive RPCs')
self.assertLessEqual(
abs(rpcs_by_peer[pod_name] - expected_rpcs_per_replica), 1,
f'Wrong number of RPCs for {pod_name}')
abs(rpcs_by_peer[hostname] - expected_rpcs_per_replica), 1,
f'Wrong number of RPCs for server {hostname}')
if __name__ == '__main__':

Loading…
Cancel
Save