diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py b/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py index c8e0cd3bab6..290f6aa2c3b 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py @@ -81,10 +81,9 @@ def debug_sock_tls(tls): f'remote: {debug_cert(tls.remote_certificate)}') -def get_deployment_pod_ips(k8s_ns, deployment_name): +def get_deployment_pods(k8s_ns, deployment_name): deployment = k8s_ns.get_deployment(deployment_name) - pods = k8s_ns.list_deployment_pods(deployment) - return [pod.status.pod_ip for pod in pods] + return k8s_ns.list_deployment_pods(deployment) def debug_security_setup_negative(test_client): @@ -186,10 +185,11 @@ def main(argv): server_name = xds_flags.SERVER_NAME.value server_namespace = resource_prefix server_k8s_ns = k8s.KubernetesNamespace(k8s_api_manager, server_namespace) - server_pod_ip = get_deployment_pod_ips(server_k8s_ns, server_name)[0] + server_pod = get_deployment_pods(server_k8s_ns, server_name)[0] test_server: _XdsTestServer = _XdsTestServer( - ip=server_pod_ip, + ip=server_pod.status.pod_ip, rpc_port=xds_flags.SERVER_PORT.value, + hostname=server_pod.metadata.name, xds_host=xds_flags.SERVER_XDS_HOST.value, xds_port=xds_flags.SERVER_XDS_PORT.value, rpc_host=_SERVER_RPC_HOST.value) @@ -198,11 +198,12 @@ def main(argv): client_name = xds_flags.CLIENT_NAME.value client_namespace = resource_prefix client_k8s_ns = k8s.KubernetesNamespace(k8s_api_manager, client_namespace) - client_pod_ip = get_deployment_pod_ips(client_k8s_ns, client_name)[0] + client_pod = get_deployment_pods(client_k8s_ns, client_name)[0] test_client: _XdsTestClient = _XdsTestClient( - ip=client_pod_ip, - server_target=test_server.xds_uri, + ip=client_pod.status.pod_ip, rpc_port=xds_flags.CLIENT_PORT.value, + server_target=test_server.xds_uri, + hostname=client_pod.metadata.name, rpc_host=_CLIENT_RPC_HOST.value) if _SECURITY.value in ('mtls', 'tls', 'plaintext'): diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py index 5ee953406c0..626c28d4b40 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py @@ -26,15 +26,26 @@ Message = google.protobuf.message.Message class GrpcClientHelper: - channel: grpc.Channel DEFAULT_RPC_DEADLINE_SEC = 90 - - def __init__(self, channel: grpc.Channel, stub_class: Any): + channel: grpc.Channel + # This is purely cosmetic to make RPC logs look like method calls. + log_service_name: str + # This is purely cosmetic to output the RPC target. Normally set to the + # hostname:port of the remote service, but it doesn't have to be the + # real target. This is done so that when RPC are routed to the proxy + # or port forwarding, this still is set to a useful name. + log_target: str + + def __init__(self, + channel: grpc.Channel, + stub_class: Any, + *, + log_target: Optional[str] = ''): self.channel = channel self.stub = stub_class(channel) - # This is purely cosmetic to make RPC logs look like method calls. self.log_service_name = re.sub('Stub$', '', self.stub.__class__.__name__) + self.log_target = log_target or '' def call_unary_with_deadline( self, @@ -55,8 +66,9 @@ class GrpcClientHelper: def _log_rpc_request(self, rpc, req, call_kwargs, log_level=logging.DEBUG): logger.log(logging.DEBUG if log_level is None else log_level, - 'RPC %s.%s(request=%s(%r), %s)', self.log_service_name, rpc, - req.__class__.__name__, json_format.MessageToDict(req), + '[%s] RPC %s.%s(request=%s(%r), %s)', self.log_target, + self.log_service_name, rpc, req.__class__.__name__, + json_format.MessageToDict(req), ', '.join({f'{k}={v}' for k, v in call_kwargs.items()})) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py index b4e6b18761d..07c78717a79 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py @@ -57,8 +57,13 @@ _GetServerSocketsResponse = channelz_pb2.GetServerSocketsResponse class ChannelzServiceClient(framework.rpc.grpc.GrpcClientHelper): stub: channelz_pb2_grpc.ChannelzStub - def __init__(self, channel: grpc.Channel): - super().__init__(channel, channelz_pb2_grpc.ChannelzStub) + def __init__(self, + channel: grpc.Channel, + *, + log_target: Optional[str] = ''): + super().__init__(channel, + channelz_pb2_grpc.ChannelzStub, + log_target=log_target) @staticmethod def is_sock_tcpip_address(address: Address): diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_csds.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_csds.py index 622ba60a7b6..947f88d8fb5 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_csds.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_csds.py @@ -44,9 +44,13 @@ _ClientStatusRequest = csds_pb2.ClientStatusRequest class CsdsClient(framework.rpc.grpc.GrpcClientHelper): stub: csds_pb2_grpc.ClientStatusDiscoveryServiceStub - def __init__(self, channel: grpc.Channel): + def __init__(self, + channel: grpc.Channel, + *, + log_target: Optional[str] = ''): super().__init__(channel, - csds_pb2_grpc.ClientStatusDiscoveryServiceStub) + csds_pb2_grpc.ClientStatusDiscoveryServiceStub, + log_target=log_target) def fetch_client_status(self, **kwargs) -> Optional[ClientConfig]: """Fetches the active xDS configurations.""" diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py index 7d010b33a32..7d783a59053 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py @@ -39,8 +39,13 @@ class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper): STATS_PARTIAL_RESULTS_TIMEOUT_SEC = 1200 STATS_ACCUMULATED_RESULTS_TIMEOUT_SEC = 600 - def __init__(self, channel: grpc.Channel): - super().__init__(channel, test_pb2_grpc.LoadBalancerStatsServiceStub) + def __init__(self, + channel: grpc.Channel, + *, + log_target: Optional[str] = ''): + super().__init__(channel, + test_pb2_grpc.LoadBalancerStatsServiceStub, + log_target=log_target) def get_client_stats( self, @@ -78,9 +83,13 @@ class XdsUpdateClientConfigureServiceClient(framework.rpc.grpc.GrpcClientHelper stub: test_pb2_grpc.XdsUpdateClientConfigureServiceStub CONFIGURE_TIMEOUT_SEC: int = 5 - def __init__(self, channel: grpc.Channel): + def __init__(self, + channel: grpc.Channel, + *, + log_target: Optional[str] = ''): super().__init__(channel, - test_pb2_grpc.XdsUpdateClientConfigureServiceStub) + test_pb2_grpc.XdsUpdateClientConfigureServiceStub, + log_target=log_target) def configure( self, @@ -115,8 +124,10 @@ class XdsUpdateClientConfigureServiceClient(framework.rpc.grpc.GrpcClientHelper class XdsUpdateHealthServiceClient(framework.rpc.grpc.GrpcClientHelper): stub: test_pb2_grpc.XdsUpdateHealthServiceStub - def __init__(self, channel: grpc.Channel): - super().__init__(channel, test_pb2_grpc.XdsUpdateHealthServiceStub) + def __init__(self, channel: grpc.Channel, log_target: Optional[str] = ''): + super().__init__(channel, + test_pb2_grpc.XdsUpdateHealthServiceStub, + log_target=log_target) def set_serving(self): self.call_unary_with_deadline(rpc='SetServing', @@ -132,8 +143,10 @@ class XdsUpdateHealthServiceClient(framework.rpc.grpc.GrpcClientHelper): class HealthClient(framework.rpc.grpc.GrpcClientHelper): stub: health_pb2_grpc.HealthStub - def __init__(self, channel: grpc.Channel): - super().__init__(channel, health_pb2_grpc.HealthStub) + def __init__(self, channel: grpc.Channel, log_target: Optional[str] = ''): + super().__init__(channel, + health_pb2_grpc.HealthStub, + log_target=log_target) def check_health(self): return self.call_unary_with_deadline( diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py index b28d5b77ada..c0a804aaf87 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py @@ -44,12 +44,15 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): Represents RPC services implemented in Client component of the xds test app. https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#client """ + # A unique string identifying each client replica. Used in logging. + hostname: str def __init__(self, *, ip: str, rpc_port: int, server_target: str, + hostname: str, rpc_host: Optional[str] = None, maintenance_port: Optional[int] = None): super().__init__(rpc_host=(rpc_host or ip)) @@ -57,28 +60,35 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): self.rpc_port = rpc_port self.server_target = server_target self.maintenance_port = maintenance_port or rpc_port + self.hostname = hostname @property @functools.lru_cache(None) def load_balancer_stats(self) -> _LoadBalancerStatsServiceClient: - return _LoadBalancerStatsServiceClient(self._make_channel( - self.rpc_port)) + return _LoadBalancerStatsServiceClient( + self._make_channel(self.rpc_port), + log_target=f'{self.hostname}:{self.rpc_port}') @property @functools.lru_cache(None) def update_config(self): return _XdsUpdateClientConfigureServiceClient( - self._make_channel(self.rpc_port)) + self._make_channel(self.rpc_port), + log_target=f'{self.hostname}:{self.rpc_port}') @property @functools.lru_cache(None) def channelz(self) -> _ChannelzServiceClient: - return _ChannelzServiceClient(self._make_channel(self.maintenance_port)) + return _ChannelzServiceClient( + self._make_channel(self.maintenance_port), + log_target=f'{self.hostname}:{self.maintenance_port}') @property @functools.lru_cache(None) def csds(self) -> _CsdsClient: - return _CsdsClient(self._make_channel(self.maintenance_port)) + return _CsdsClient( + self._make_channel(self.maintenance_port), + log_target=f'{self.hostname}:{self.maintenance_port}') def get_load_balancer_stats( self, @@ -114,19 +124,22 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): _ChannelzChannelState.READY) # Get the first subchannel of the active channel to the server. logger.debug( - 'Retrieving client -> server socket, ' - 'channel_id: %s, subchannel: %s', channel.ref.channel_id, - channel.subchannel_ref[0].name) + '[%s] Retrieving client -> server socket, ' + 'channel_id: %s, subchannel: %s', self.hostname, + channel.ref.channel_id, channel.subchannel_ref[0].name) subchannel, *subchannels = list( self.channelz.list_channel_subchannels(channel)) if subchannels: - logger.warning('Unexpected subchannels: %r', subchannels) + logger.warning('[%s] Unexpected subchannels: %r', self.hostname, + subchannels) # Get the first socket of the subchannel socket, *sockets = list( self.channelz.list_subchannels_sockets(subchannel)) if sockets: - logger.warning('Unexpected sockets: %r', subchannels) - logger.debug('Found client -> server socket: %s', socket.ref.name) + logger.warning('[%s] Unexpected sockets: %r', self.hostname, + subchannels) + logger.debug('[%s] Found client -> server socket: %s', self.hostname, + socket.ref.name) return socket def wait_for_server_channel_state( @@ -146,14 +159,13 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): wait_max=_timedelta(seconds=25), timeout=_timedelta(minutes=5) if timeout is None else timeout) - logger.info('Waiting for client %s to report a %s channel to %s', - self.ip, _ChannelzChannelState.Name(state), - self.server_target) + logger.info('[%s] Waiting to report a %s channel to %s', self.hostname, + _ChannelzChannelState.Name(state), self.server_target) channel = retryer(self.find_server_channel_with_state, state, rpc_deadline=rpc_deadline) - logger.info('Client %s channel to %s transitioned to state %s:\n%s', - self.ip, self.server_target, + logger.info('[%s] Channel to %s transitioned to state %s:\n%s', + self.hostname, self.server_target, _ChannelzChannelState.Name(state), channel) return channel @@ -169,7 +181,8 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): for channel in self.get_server_channels(**rpc_params): channel_state: _ChannelzChannelState = channel.data.state.state - logger.info('Server channel: %s, state: %s', channel.ref.name, + logger.info('[%s] Server channel: %s, state: %s', self.hostname, + channel.ref.name, _ChannelzChannelState.Name(channel_state)) if channel_state is state: if check_subchannel: @@ -178,7 +191,8 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): try: subchannel = self.find_subchannel_with_state( channel, state, **rpc_params) - logger.info('Found subchannel in state %s: %s', + logger.info('[%s] Found subchannel in state %s: %s', + self.hostname, _ChannelzChannelState.Name(state), subchannel) except self.NotFound as e: @@ -188,8 +202,8 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): return channel raise self.NotFound( - f'Client has no {_ChannelzChannelState.Name(state)} channel with ' - 'the server') + f'[{self.hostname}] Client has no ' + f'{_ChannelzChannelState.Name(state)} channel with the server') def get_server_channels(self, **kwargs) -> Iterable[_ChannelzChannel]: return self.channelz.find_channels_for_target(self.server_target, @@ -203,9 +217,9 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): if subchannel.data.state.state is state: return subchannel - raise self.NotFound( - f'Not found a {_ChannelzChannelState.Name(state)} ' - f'subchannel for channel_id {channel.ref.channel_id}') + raise self.NotFound(f'[{self.hostname}] Not found ' + f'a {_ChannelzChannelState.Name(state)} subchannel ' + f'for channel_id {channel.ref.channel_id}') def find_subchannels_with_state(self, state: _ChannelzChannelState, **kwargs) -> List[_ChannelzSubchannel]: diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py index 591022608b2..05aaf3c1e80 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -161,7 +161,8 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): return XdsTestClient(ip=pod_ip, rpc_port=rpc_port, server_target=server_target, - rpc_host=rpc_host) + rpc_host=rpc_host, + hostname=pod.metadata.name) def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ if self.port_forwarder: diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py index e10114841ba..acb1e52bf6b 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -102,7 +102,6 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): test_port=DEFAULT_TEST_PORT, maintenance_port=None, secure_mode=False, - server_id=None, replica_count=1) -> List[XdsTestServer]: # Implementation detail: in secure mode, maintenance ("backchannel") # port must be different from the test port so communication with @@ -127,9 +126,9 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): logger.info( 'Deploying xDS test server "%s" to k8s namespace %s: test_port=%s ' - 'maintenance_port=%s secure_mode=%s server_id=%s replica_count=%s', + 'maintenance_port=%s secure_mode=%s replica_count=%s', self.deployment_name, self.k8s_namespace.name, test_port, - maintenance_port, secure_mode, server_id, replica_count) + maintenance_port, secure_mode, replica_count) self._logs_explorer_link(deployment_name=self.deployment_name, namespace_name=self.k8s_namespace.name, gcp_project=self.gcp_project, @@ -180,7 +179,6 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): replica_count=replica_count, test_port=test_port, maintenance_port=maintenance_port, - server_id=server_id, secure_mode=secure_mode) self._wait_deployment_with_available_replicas(self.deployment_name, @@ -210,11 +208,10 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): servers.append( XdsTestServer(ip=pod_ip, rpc_port=test_port, + hostname=pod_name, maintenance_port=local_port, secure_mode=secure_mode, - server_id=server_id, - rpc_host=rpc_host, - pod_name=pod_name)) + rpc_host=rpc_host)) return servers def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py index 8349c6bb6a3..973f8b99159 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py @@ -35,52 +35,61 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp): Represents RPC services implemented in Server component of the xDS test app. https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server """ + # A unique host name identifying each server replica. + # Server implementation must return this in the SimpleResponse.hostname, + # which client uses as the key in rpcs_by_peer map. + hostname: str def __init__(self, *, ip: str, rpc_port: int, + hostname: str, maintenance_port: Optional[int] = None, secure_mode: Optional[bool] = False, - server_id: Optional[str] = None, xds_host: Optional[str] = None, xds_port: Optional[int] = None, - rpc_host: Optional[str] = None, - pod_name: Optional[str] = None): + rpc_host: Optional[str] = None): super().__init__(rpc_host=(rpc_host or ip)) self.ip = ip self.rpc_port = rpc_port + self.hostname = hostname self.maintenance_port = maintenance_port or rpc_port self.secure_mode = secure_mode - self.server_id = server_id self.xds_host, self.xds_port = xds_host, xds_port - self.pod_name = pod_name @property @functools.lru_cache(None) def channelz(self) -> _ChannelzServiceClient: - return _ChannelzServiceClient(self._make_channel(self.maintenance_port)) + return _ChannelzServiceClient( + self._make_channel(self.maintenance_port), + log_target=f'{self.hostname}:{self.maintenance_port}') @property @functools.lru_cache(None) def update_health_service_client(self) -> _XdsUpdateHealthServiceClient: return _XdsUpdateHealthServiceClient( - self._make_channel(self.maintenance_port)) + self._make_channel(self.maintenance_port), + log_target=f'{self.hostname}:{self.maintenance_port}') @property @functools.lru_cache(None) def health_client(self) -> _HealthClient: - return _HealthClient(self._make_channel(self.maintenance_port)) + return _HealthClient( + self._make_channel(self.maintenance_port), + log_target=f'{self.hostname}:{self.maintenance_port}') def set_serving(self): - logger.info('Setting health status to serving') + logger.info('[%s] Setting health status to SERVING', self.hostname) self.update_health_service_client.set_serving() - logger.info('Server reports %s', self.health_client.check_health()) + logger.info('[%s] Health status %s', self.hostname, + self.health_client.check_health()) def set_not_serving(self): - logger.info('Setting health status to not serving') + logger.info('[%s] Setting health status to NOT_SERVING', self.hostname) self.update_health_service_client.set_not_serving() - logger.info('Server reports %s', self.health_client.check_health()) + logger.info('[%s] Health status %s', self.hostname, + self.health_client.check_health()) def set_xds_address(self, xds_host, xds_port: Optional[int] = None): self.xds_host, self.xds_port = xds_host, xds_port @@ -107,8 +116,8 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp): """ server = self.channelz.find_server_listening_on_port(self.rpc_port) if not server: - raise self.NotFound( - f'Server listening on port {self.rpc_port} not found') + raise self.NotFound(f'[{self.hostname}] Server' + f'listening on port {self.rpc_port} not found') return server def get_test_server_sockets(self) -> Iterator[grpc_channelz.Socket]: @@ -131,16 +140,19 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp): GrpcApp.NotFound: Server socket matching client socket not found. """ client_local = self.channelz.sock_address_to_str(client_socket.local) - logger.debug('Looking for a server socket connected to the client %s', - client_local) + logger.debug( + '[%s] Looking for a server socket connected ' + 'to the client %s', self.hostname, client_local) server_socket = self.channelz.find_server_socket_matching_client( self.get_test_server_sockets(), client_socket) if not server_socket: - raise self.NotFound( - f'Server socket to client {client_local} not found') - - logger.info('Found matching socket pair: server(%s) <-> client(%s)', - self.channelz.sock_addresses_pretty(server_socket), - self.channelz.sock_addresses_pretty(client_socket)) + raise self.NotFound(f'[{self.hostname}] Socket ' + f'to client {client_local} not found') + + logger.info( + '[%s] Found matching socket pair: ' + 'server(%s) <-> client(%s)', self.hostname, + self.channelz.sock_addresses_pretty(server_socket), + self.channelz.sock_addresses_pretty(client_socket)) return server_socket diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py index 61df158b10e..31073b75f24 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py @@ -247,11 +247,11 @@ class XdsKubernetesBaseTestCase(absltest.TestCase): before_stats = test_client.get_load_balancer_accumulated_stats() response_type = 'LoadBalancerAccumulatedStatsResponse' logging.info('Received %s from test client %s: before:\n%s', - response_type, test_client.ip, before_stats) + response_type, test_client.hostname, before_stats) time.sleep(duration.total_seconds()) after_stats = test_client.get_load_balancer_accumulated_stats() logging.info('Received %s from test client %s: after:\n%s', - response_type, test_client.ip, after_stats) + response_type, test_client.hostname, after_stats) diff_stats = self.diffAccumulatedStatsPerMethod(before_stats, after_stats) @@ -283,20 +283,20 @@ class XdsKubernetesBaseTestCase(absltest.TestCase): def _assertRpcsEventuallyGoToGivenServers(self, test_client: XdsTestClient, servers: List[XdsTestServer], num_rpcs: int): - server_names = [server.pod_name for server in servers] - logger.info('Verifying RPCs go to %s', server_names) + server_hostnames = [server.hostname for server in servers] + logger.info('Verifying RPCs go to servers %s', server_hostnames) lb_stats = self.getClientRpcStats(test_client, num_rpcs) failed = int(lb_stats.num_failures) self.assertLessEqual( failed, 0, msg=f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed') - for server_name in server_names: - self.assertIn(server_name, lb_stats.rpcs_by_peer, - f'{server_name} did not receive RPCs') - for peer in lb_stats.rpcs_by_peer.keys(): - self.assertIn(peer, server_names, - f'Unexpected server {peer} received RPCs') + for server_hostname in server_hostnames: + self.assertIn(server_hostname, lb_stats.rpcs_by_peer, + f'Server {server_hostname} did not receive RPCs') + for server_hostname in lb_stats.rpcs_by_peer.keys(): + self.assertIn(server_hostname, server_hostnames, + f'Unexpected server {server_hostname} received RPCs') def assertXdsConfigExists(self, test_client: XdsTestClient): config = test_client.csds.fetch_client_status(log_level=logging.INFO) @@ -379,7 +379,7 @@ class XdsKubernetesBaseTestCase(absltest.TestCase): lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs) logger.info( 'Received LoadBalancerStatsResponse from test client %s:\n%s', - test_client.ip, lb_stats) + test_client.hostname, lb_stats) return lb_stats def assertAllBackendsReceivedRpcs(self, lb_stats): diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py index 4fc696f824d..b87db987e3d 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py @@ -423,7 +423,7 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase): test_client.get_load_balancer_stats(num_rpcs=num_rpcs)) logging.info( 'Received LoadBalancerStatsResponse from test client %s:\n%s', - test_client.ip, json.dumps(json_lb_stats, indent=2)) + test_client.hostname, json.dumps(json_lb_stats, indent=2)) return RpcDistributionStats(json_lb_stats) def assertNumEndpoints(self, xds_config: DumpedXdsConfig, k: int) -> None: @@ -441,12 +441,12 @@ class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase): before_stats = test_client.get_load_balancer_accumulated_stats() logging.info( 'Received LoadBalancerAccumulatedStatsResponse from test client %s: before:\n%s', - test_client.ip, before_stats) + test_client.hostname, before_stats) time.sleep(length) after_stats = test_client.get_load_balancer_accumulated_stats() logging.info( 'Received LoadBalancerAccumulatedStatsResponse from test client %s: after: \n%s', - test_client.ip, after_stats) + test_client.hostname, after_stats) # Validate the diff for expected_result in expected: diff --git a/tools/run_tests/xds_k8s_test_driver/tests/affinity_test.py b/tools/run_tests/xds_k8s_test_driver/tests/affinity_test.py index cf1bdaae39b..b62af71367f 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/affinity_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/affinity_test.py @@ -139,11 +139,9 @@ class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): rpc_distribution.raw['rpcsByPeer'].keys())[0] with self.subTest('11_turn_down_server_in_use'): - for s in test_servers: - if s.pod_name == first_backend_inuse: - logging.info('setting backend %s to NOT_SERVING', - s.pod_name) - s.set_not_serving() + for server in test_servers: + if server.hostname == first_backend_inuse: + server.set_not_serving() with self.subTest('12_wait_for_unhealth_status_propagation'): deadline = time.time() + _TD_PROPAGATE_TIMEOUT diff --git a/tools/run_tests/xds_k8s_test_driver/tests/outlier_detection_test.py b/tools/run_tests/xds_k8s_test_driver/tests/outlier_detection_test.py index 73008c14477..966b5b33c66 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/outlier_detection_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/outlier_detection_test.py @@ -115,7 +115,7 @@ class OutlierDetectionTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): rpc_types=rpc_types, metadata=( (RpcTypeUnaryCall, 'rpc-behavior', - f'hostname={test_servers[0].pod_name} error-code-2'),)) + f'hostname={test_servers[0].hostname} error-code-2'),)) self.assertRpcsEventuallyGoToGivenServers(test_client, test_servers[1:]) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py b/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py index 4965ac13794..c8449eb9149 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/round_robin_test.py @@ -74,12 +74,12 @@ class RoundRobinTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): self.assertEqual(total_requests_received, num_rpcs, 'Wrong number of RPCS') for server in test_servers: - pod_name = server.pod_name - self.assertIn(pod_name, rpcs_by_peer, - f'pod {pod_name} did not receive RPCs') + hostname = server.hostname + self.assertIn(hostname, rpcs_by_peer, + f'Server {hostname} did not receive RPCs') self.assertLessEqual( - abs(rpcs_by_peer[pod_name] - expected_rpcs_per_replica), 1, - f'Wrong number of RPCs for {pod_name}') + abs(rpcs_by_peer[hostname] - expected_rpcs_per_replica), 1, + f'Wrong number of RPCs for server {hostname}') if __name__ == '__main__':