diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py index 256982564ac..3d389696f17 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py @@ -14,6 +14,7 @@ import functools import json import logging +import re import subprocess import time from typing import List, Optional, Tuple @@ -78,9 +79,96 @@ class PortForwardingError(Exception): """Error forwarding port""" +class PortForwarder: + PORT_FORWARD_LOCAL_ADDRESS: str = '127.0.0.1' + + def __init__(self, + context: str, + namespace: str, + destination: str, + remote_port: int, + local_port: Optional[int] = None, + local_address: Optional[str] = None): + self.context = context + self.namespace = namespace + self.destination = destination + self.remote_port = remote_port + self.local_address = local_address or self.PORT_FORWARD_LOCAL_ADDRESS + self.local_port: Optional[int] = local_port + self.subprocess: Optional[subprocess.Popen] = None + + def connect(self) -> None: + port_mapping = f"{self.local_port}:{self.remote_port}" if self.local_port else f":{self.remote_port}" + cmd = [ + "kubectl", "--context", self.context, "--namespace", self.namespace, + "port-forward", "--address", self.local_address, self.destination, + port_mapping + ] + self.subprocess = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True) + # Wait for stdout line indicating successful start. + if self.local_port: + local_port_expected = ( + f"Forwarding from {self.local_address}:{self.local_port}" + f" -> {self.remote_port}") + else: + local_port_re = re.compile( + f"Forwarding from {self.local_address}:([0-9]+) -> {self.remote_port}" + ) + try: + while True: + time.sleep(0.05) + output = self.subprocess.stdout.readline().strip() + if not output: + return_code = self.subprocess.poll() + if return_code is not None: + errors = [ + error + for error in self.subprocess.stdout.readlines() + ] + raise PortForwardingError( + 'Error forwarding port, kubectl return ' + f'code {return_code}, output {errors}') + # If there is no output, and the subprocess is not exiting, + # continue waiting for the log line. + continue + + # Validate output log + if self.local_port: + if output != local_port_expected: + raise PortForwardingError( + f'Error forwarding port, unexpected output {output}' + ) + else: + groups = local_port_re.search(output) + if groups is None: + raise PortForwardingError( + f'Error forwarding port, unexpected output {output}' + ) + # Update local port to the randomly picked one + self.local_port = int(groups[1]) + + logger.info(output) + break + except Exception: + self.close() + raise + + def close(self) -> None: + if self.subprocess is not None: + logger.info('Shutting down port forwarding, pid %s', + self.subprocess.pid) + self.subprocess.kill() + stdout, _ = self.subprocess.communicate(timeout=5) + logger.info('Port forwarding stopped') + logger.debug('Port forwarding remaining stdout: %s', stdout) + self.subprocess = None + + class KubernetesNamespace: NEG_STATUS_META = 'cloud.google.com/neg-status' - PORT_FORWARD_LOCAL_ADDRESS: str = '127.0.0.1' DELETE_GRACE_PERIOD_SEC: int = 5 WAIT_SHORT_TIMEOUT_SEC: int = 60 WAIT_SHORT_SLEEP_SEC: int = 1 @@ -302,54 +390,13 @@ class KubernetesNamespace: remote_port: int, local_port: Optional[int] = None, local_address: Optional[str] = None, - ) -> subprocess.Popen: - """Experimental""" - local_address = local_address or self.PORT_FORWARD_LOCAL_ADDRESS - local_port = local_port or remote_port - cmd = [ - "kubectl", "--context", self.api.context, "--namespace", self.name, - "port-forward", "--address", local_address, - f"pod/{pod.metadata.name}", f"{local_port}:{remote_port}" - ] - pf = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True) - # Wait for stdout line indicating successful start. - expected = (f"Forwarding from {local_address}:{local_port}" - f" -> {remote_port}") - try: - while True: - time.sleep(0.05) - output = pf.stdout.readline().strip() - if not output: - return_code = pf.poll() - if return_code is not None: - errors = [error for error in pf.stdout.readlines()] - raise PortForwardingError( - 'Error forwarding port, kubectl return ' - f'code {return_code}, output {errors}') - elif output != expected: - raise PortForwardingError( - f'Error forwarding port, unexpected output {output}') - else: - logger.info(output) - break - except Exception: - self.port_forward_stop(pf) - raise - - # TODO(sergiitk): return new PortForwarder object + ) -> PortForwarder: + pf = PortForwarder(self.api.context, self.name, + f"pod/{pod.metadata.name}", remote_port, local_port, + local_address) + pf.connect() return pf - @staticmethod - def port_forward_stop(pf): - logger.info('Shutting down port forwarding, pid %s', pf.pid) - pf.kill() - stdout, _stderr = pf.communicate(timeout=5) - logger.info('Port forwarding stopped') - logger.debug('Port forwarding remaining stdout: %s', stdout) - @staticmethod def _pod_started(pod: V1Pod): return pod.status.phase not in ('Pending', 'Unknown') diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py index 4ddfa3ebf55..c5d8c04069d 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py @@ -281,7 +281,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): # Mutable state self.deployment: Optional[k8s.V1Deployment] = None self.service_account: Optional[k8s.V1ServiceAccount] = None - self.port_forwarder = None + self.port_forwarder: Optional[k8s.PortForwarder] = None # TODO(sergiitk): make rpc UnaryCall enum or get it from proto def run(self, @@ -344,6 +344,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0] self._wait_pod_started(pod.metadata.name) pod_ip = pod.status.pod_ip + rpc_port = self.stats_port rpc_host = None # Experimental, for local debugging. @@ -352,16 +353,17 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner): pod_ip, self.stats_port) self.port_forwarder = self.k8s_namespace.port_forward_pod( pod, remote_port=self.stats_port) - rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS + rpc_port = self.port_forwarder.local_port + rpc_host = self.port_forwarder.local_address return XdsTestClient(ip=pod_ip, - rpc_port=self.stats_port, + rpc_port=rpc_port, server_target=server_target, rpc_host=rpc_host) def cleanup(self, *, force=False, force_namespace=False): if self.port_forwarder: - self.k8s_namespace.port_forward_stop(self.port_forwarder) + self.port_forwarder.close() self.port_forwarder = None if self.deployment or force: self._delete_deployment(self.deployment_name) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py index bc2e092af93..3d583f66d80 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py @@ -158,9 +158,6 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner): DEFAULT_MAINTENANCE_PORT = 8080 DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081 - _lock = threading.Lock() - _server_port_forwarding_offset = 0 - def __init__(self, k8s_namespace, *, @@ -224,7 +221,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner): self.deployment: Optional[k8s.V1Deployment] = None self.service_account: Optional[k8s.V1ServiceAccount] = None self.service: Optional[k8s.V1Service] = None - self.port_forwarders = [] + self.port_forwarders: List[k8s.PortForwarder] = [] def run(self, *, @@ -328,18 +325,13 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner): # Experimental, for local debugging. local_port = maintenance_port if self.debug_use_port_forwarding: - with KubernetesServerRunner._lock: - local_port = maintenance_port + KubernetesServerRunner._server_port_forwarding_offset - KubernetesServerRunner._server_port_forwarding_offset += 1 - logger.info( - 'LOCAL DEV MODE: Enabling port forwarding to %s:%s using local port %s', - pod_ip, maintenance_port, local_port) - self.port_forwarders.append( - self.k8s_namespace.port_forward_pod( - pod, - remote_port=maintenance_port, - local_port=local_port)) - rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS + logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s', + pod_ip, maintenance_port) + port_forwarder = self.k8s_namespace.port_forward_pod( + pod, remote_port=maintenance_port) + self.port_forwarders.append(port_forwarder) + local_port = port_forwarder.local_port + rpc_host = port_forwarder.local_address servers.append( XdsTestServer(ip=pod_ip, @@ -354,7 +346,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner): def cleanup(self, *, force=False, force_namespace=False): if self.port_forwarders: for port_forwarder in self.port_forwarders: - self.k8s_namespace.port_forward_stop(port_forwarder) + port_forwarder.close() self.port_forwarders = [] if self.deployment or force: self._delete_deployment(self.deployment_name) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py index 44d5fad8733..17b64eeede4 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py @@ -43,9 +43,6 @@ UrlMapType = Any HostRule = Any PathMatcher = Any -_BackendHTTP2 = gcp.compute.ComputeV1.BackendServiceProtocol.HTTP2 -_COMPUTE_V1_URL_PREFIX = 'https://www.googleapis.com/compute/v1' - class _UrlMapChangeAggregator: """Where all the urlMap change happens."""