[xDS GKE] use randomized local forwarding port for parallism (#28694)

* [xDS GKE] use randomized local forwarding port for parallism

* Implement a PortForwarder class

* Add missing types and remove unused code

* Correct the error path

* Split the connect logic from init
pull/28708/head
Lidi Zheng 3 years ago committed by GitHub
parent ac139598f0
commit 890a9de53e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 141
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py
  2. 10
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
  3. 26
      tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py
  4. 3
      tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py

@ -14,6 +14,7 @@
import functools
import json
import logging
import re
import subprocess
import time
from typing import List, Optional, Tuple
@ -78,9 +79,96 @@ class PortForwardingError(Exception):
"""Error forwarding port"""
class PortForwarder:
PORT_FORWARD_LOCAL_ADDRESS: str = '127.0.0.1'
def __init__(self,
context: str,
namespace: str,
destination: str,
remote_port: int,
local_port: Optional[int] = None,
local_address: Optional[str] = None):
self.context = context
self.namespace = namespace
self.destination = destination
self.remote_port = remote_port
self.local_address = local_address or self.PORT_FORWARD_LOCAL_ADDRESS
self.local_port: Optional[int] = local_port
self.subprocess: Optional[subprocess.Popen] = None
def connect(self) -> None:
port_mapping = f"{self.local_port}:{self.remote_port}" if self.local_port else f":{self.remote_port}"
cmd = [
"kubectl", "--context", self.context, "--namespace", self.namespace,
"port-forward", "--address", self.local_address, self.destination,
port_mapping
]
self.subprocess = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
# Wait for stdout line indicating successful start.
if self.local_port:
local_port_expected = (
f"Forwarding from {self.local_address}:{self.local_port}"
f" -> {self.remote_port}")
else:
local_port_re = re.compile(
f"Forwarding from {self.local_address}:([0-9]+) -> {self.remote_port}"
)
try:
while True:
time.sleep(0.05)
output = self.subprocess.stdout.readline().strip()
if not output:
return_code = self.subprocess.poll()
if return_code is not None:
errors = [
error
for error in self.subprocess.stdout.readlines()
]
raise PortForwardingError(
'Error forwarding port, kubectl return '
f'code {return_code}, output {errors}')
# If there is no output, and the subprocess is not exiting,
# continue waiting for the log line.
continue
# Validate output log
if self.local_port:
if output != local_port_expected:
raise PortForwardingError(
f'Error forwarding port, unexpected output {output}'
)
else:
groups = local_port_re.search(output)
if groups is None:
raise PortForwardingError(
f'Error forwarding port, unexpected output {output}'
)
# Update local port to the randomly picked one
self.local_port = int(groups[1])
logger.info(output)
break
except Exception:
self.close()
raise
def close(self) -> None:
if self.subprocess is not None:
logger.info('Shutting down port forwarding, pid %s',
self.subprocess.pid)
self.subprocess.kill()
stdout, _ = self.subprocess.communicate(timeout=5)
logger.info('Port forwarding stopped')
logger.debug('Port forwarding remaining stdout: %s', stdout)
self.subprocess = None
class KubernetesNamespace:
NEG_STATUS_META = 'cloud.google.com/neg-status'
PORT_FORWARD_LOCAL_ADDRESS: str = '127.0.0.1'
DELETE_GRACE_PERIOD_SEC: int = 5
WAIT_SHORT_TIMEOUT_SEC: int = 60
WAIT_SHORT_SLEEP_SEC: int = 1
@ -302,54 +390,13 @@ class KubernetesNamespace:
remote_port: int,
local_port: Optional[int] = None,
local_address: Optional[str] = None,
) -> subprocess.Popen:
"""Experimental"""
local_address = local_address or self.PORT_FORWARD_LOCAL_ADDRESS
local_port = local_port or remote_port
cmd = [
"kubectl", "--context", self.api.context, "--namespace", self.name,
"port-forward", "--address", local_address,
f"pod/{pod.metadata.name}", f"{local_port}:{remote_port}"
]
pf = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
# Wait for stdout line indicating successful start.
expected = (f"Forwarding from {local_address}:{local_port}"
f" -> {remote_port}")
try:
while True:
time.sleep(0.05)
output = pf.stdout.readline().strip()
if not output:
return_code = pf.poll()
if return_code is not None:
errors = [error for error in pf.stdout.readlines()]
raise PortForwardingError(
'Error forwarding port, kubectl return '
f'code {return_code}, output {errors}')
elif output != expected:
raise PortForwardingError(
f'Error forwarding port, unexpected output {output}')
else:
logger.info(output)
break
except Exception:
self.port_forward_stop(pf)
raise
# TODO(sergiitk): return new PortForwarder object
) -> PortForwarder:
pf = PortForwarder(self.api.context, self.name,
f"pod/{pod.metadata.name}", remote_port, local_port,
local_address)
pf.connect()
return pf
@staticmethod
def port_forward_stop(pf):
logger.info('Shutting down port forwarding, pid %s', pf.pid)
pf.kill()
stdout, _stderr = pf.communicate(timeout=5)
logger.info('Port forwarding stopped')
logger.debug('Port forwarding remaining stdout: %s', stdout)
@staticmethod
def _pod_started(pod: V1Pod):
return pod.status.phase not in ('Pending', 'Unknown')

@ -281,7 +281,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
# Mutable state
self.deployment: Optional[k8s.V1Deployment] = None
self.service_account: Optional[k8s.V1ServiceAccount] = None
self.port_forwarder = None
self.port_forwarder: Optional[k8s.PortForwarder] = None
# TODO(sergiitk): make rpc UnaryCall enum or get it from proto
def run(self,
@ -344,6 +344,7 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
pod = self.k8s_namespace.list_deployment_pods(self.deployment)[0]
self._wait_pod_started(pod.metadata.name)
pod_ip = pod.status.pod_ip
rpc_port = self.stats_port
rpc_host = None
# Experimental, for local debugging.
@ -352,16 +353,17 @@ class KubernetesClientRunner(base_runner.KubernetesBaseRunner):
pod_ip, self.stats_port)
self.port_forwarder = self.k8s_namespace.port_forward_pod(
pod, remote_port=self.stats_port)
rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS
rpc_port = self.port_forwarder.local_port
rpc_host = self.port_forwarder.local_address
return XdsTestClient(ip=pod_ip,
rpc_port=self.stats_port,
rpc_port=rpc_port,
server_target=server_target,
rpc_host=rpc_host)
def cleanup(self, *, force=False, force_namespace=False):
if self.port_forwarder:
self.k8s_namespace.port_forward_stop(self.port_forwarder)
self.port_forwarder.close()
self.port_forwarder = None
if self.deployment or force:
self._delete_deployment(self.deployment_name)

@ -158,9 +158,6 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
DEFAULT_MAINTENANCE_PORT = 8080
DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081
_lock = threading.Lock()
_server_port_forwarding_offset = 0
def __init__(self,
k8s_namespace,
*,
@ -224,7 +221,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
self.deployment: Optional[k8s.V1Deployment] = None
self.service_account: Optional[k8s.V1ServiceAccount] = None
self.service: Optional[k8s.V1Service] = None
self.port_forwarders = []
self.port_forwarders: List[k8s.PortForwarder] = []
def run(self,
*,
@ -328,18 +325,13 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
# Experimental, for local debugging.
local_port = maintenance_port
if self.debug_use_port_forwarding:
with KubernetesServerRunner._lock:
local_port = maintenance_port + KubernetesServerRunner._server_port_forwarding_offset
KubernetesServerRunner._server_port_forwarding_offset += 1
logger.info(
'LOCAL DEV MODE: Enabling port forwarding to %s:%s using local port %s',
pod_ip, maintenance_port, local_port)
self.port_forwarders.append(
self.k8s_namespace.port_forward_pod(
pod,
remote_port=maintenance_port,
local_port=local_port))
rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS
logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
pod_ip, maintenance_port)
port_forwarder = self.k8s_namespace.port_forward_pod(
pod, remote_port=maintenance_port)
self.port_forwarders.append(port_forwarder)
local_port = port_forwarder.local_port
rpc_host = port_forwarder.local_address
servers.append(
XdsTestServer(ip=pod_ip,
@ -354,7 +346,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
def cleanup(self, *, force=False, force_namespace=False):
if self.port_forwarders:
for port_forwarder in self.port_forwarders:
self.k8s_namespace.port_forward_stop(port_forwarder)
port_forwarder.close()
self.port_forwarders = []
if self.deployment or force:
self._delete_deployment(self.deployment_name)

@ -43,9 +43,6 @@ UrlMapType = Any
HostRule = Any
PathMatcher = Any
_BackendHTTP2 = gcp.compute.ComputeV1.BackendServiceProtocol.HTTP2
_COMPUTE_V1_URL_PREFIX = 'https://www.googleapis.com/compute/v1'
class _UrlMapChangeAggregator:
"""Where all the urlMap change happens."""

Loading…
Cancel
Save