[PSM Interop] Various improvements to the helper scripts (#32745)

- Fix broken `bin/run_channelz.py` helper
- Create `bin/run_ping_pong.py` helper that runs the baseline (aka
"ping_pong") test against preconfigured infra
- Setup automatic port forwarding when running `bin/run_channelz.py` and
`bin/run_ping_pong.py`
- Create `bin/cleanup_cluster.sh` helper to wipe xds out resources based
namespaces present on the cluster

Note: this involves a small change to the non-helper code, but it's just
moving a the part that makes XdsTestServer/XdsTestClient instance for a
given pod.
pull/32950/head
Sergii Tkachenko 2 years ago committed by GitHub
parent 83cdbfff8c
commit c0ee9ff4d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 88
      tools/run_tests/xds_k8s_test_driver/bin/cleanup_cluster.sh
  2. 13
      tools/run_tests/xds_k8s_test_driver/bin/lib/__init__.py
  3. 157
      tools/run_tests/xds_k8s_test_driver/bin/lib/common.py
  4. 109
      tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py
  5. 141
      tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py
  6. 12
      tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py
  7. 68
      tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py
  8. 53
      tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py
  9. 6
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py
  10. 58
      tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py

@ -0,0 +1,88 @@
#!/usr/bin/env bash
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -eo pipefail
SCRIPT_DIR="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
readonly SCRIPT_DIR
readonly XDS_K8S_DRIVER_DIR="${SCRIPT_DIR}/.."
cd "${XDS_K8S_DRIVER_DIR}"
NO_SECURE="yes"
DATE_TO=$(date -Iseconds)
while [[ $# -gt 0 ]]; do
case $1 in
--secure) NO_SECURE=""; shift ;;
--date_to=*) DATE_TO="${1#*=}T00:00:00Z"; shift ;;
*) echo "Unknown argument $1"; exit 1 ;;
esac
done
jq_selector=$(cat <<- 'EOM'
.items[].metadata |
select(
(.name | test("-(client|server)-")) and
(.creationTimestamp < $date_to)
) | .name
EOM
)
mapfile -t namespaces < <(\
kubectl get namespaces --sort-by='{.metadata.creationTimestamp}'\
--selector='owner=xds-k8s-interop-test'\
-o json\
| jq --arg date_to "${DATE_TO}" -r "${jq_selector}"
)
if [[ -z "${namespaces[*]}" ]]; then
echo "All clean."
exit 0
fi
echo "Found namespaces:"
namespaces_joined=$(IFS=,; printf '%s' "${namespaces[*]}")
kubectl get namespaces --sort-by='{.metadata.creationTimestamp}' \
--selector="name in (${namespaces_joined})"
# Suffixes
mapfile -t suffixes < <(\
printf '%s\n' "${namespaces[@]}" | sed -E 's/psm-interop-(server|client)-//'
)
echo
echo "Found suffixes: ${suffixes[*]}"
echo "Run plan:"
for suffix in "${suffixes[@]}"; do
echo ./bin/cleanup.sh ${NO_SECURE:+"--nosecure"} "--resource_suffix=${suffix}"
done
read -r -n 1 -p "Continue? (y/N) " answer
if [[ "$answer" != "${answer#[Yy]}" ]] ;then
echo
echo "Starting the cleanup."
else
echo
echo "Exit"
exit 0
fi
for suffix in "${suffixes[@]}"; do
echo "-------------------- Cleaning suffix ${suffix} --------------------"
set -x
./bin/cleanup.sh ${NO_SECURE:+"--nosecure"} "--resource_suffix=${suffix}"
set +x
echo "-------------------- Finished cleaning ${suffix} --------------------"
done

@ -0,0 +1,13 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,157 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common functionality for bin/ python helpers."""
import atexit
import signal
import sys
from absl import logging
from framework import xds_flags
from framework import xds_k8s_flags
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app import client_app
from framework.test_app import server_app
from framework.test_app.runners.k8s import k8s_xds_client_runner
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.get_absl_logger()
# Type aliases
KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
_XdsTestServer = server_app.XdsTestServer
_XdsTestClient = client_app.XdsTestClient
def make_client_namespace(
k8s_api_manager: k8s.KubernetesApiManager) -> k8s.KubernetesNamespace:
namespace_name: str = KubernetesClientRunner.make_namespace_name(
xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value)
return k8s.KubernetesNamespace(k8s_api_manager, namespace_name)
def make_client_runner(namespace: k8s.KubernetesNamespace,
gcp_api_manager: gcp.api.GcpApiManager,
port_forwarding: bool = False,
reuse_namespace: bool = True,
secure: bool = False) -> KubernetesClientRunner:
# KubernetesClientRunner arguments.
runner_kwargs = dict(
deployment_name=xds_flags.CLIENT_NAME.value,
image_name=xds_k8s_flags.CLIENT_IMAGE.value,
td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value,
gcp_project=xds_flags.PROJECT.value,
gcp_api_manager=gcp_api_manager,
gcp_service_account=xds_k8s_flags.GCP_SERVICE_ACCOUNT.value,
xds_server_uri=xds_flags.XDS_SERVER_URI.value,
network=xds_flags.NETWORK.value,
stats_port=xds_flags.CLIENT_PORT.value,
reuse_namespace=reuse_namespace,
debug_use_port_forwarding=port_forwarding)
if secure:
runner_kwargs.update(
deployment_template='client-secure.deployment.yaml')
return KubernetesClientRunner(namespace, **runner_kwargs)
def make_server_namespace(
k8s_api_manager: k8s.KubernetesApiManager) -> k8s.KubernetesNamespace:
namespace_name: str = KubernetesServerRunner.make_namespace_name(
xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value)
return k8s.KubernetesNamespace(k8s_api_manager, namespace_name)
def make_server_runner(namespace: k8s.KubernetesNamespace,
gcp_api_manager: gcp.api.GcpApiManager,
port_forwarding: bool = False,
reuse_namespace: bool = True,
reuse_service: bool = False,
secure: bool = False) -> KubernetesServerRunner:
# KubernetesServerRunner arguments.
runner_kwargs = dict(
deployment_name=xds_flags.SERVER_NAME.value,
image_name=xds_k8s_flags.SERVER_IMAGE.value,
td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value,
gcp_project=xds_flags.PROJECT.value,
gcp_api_manager=gcp_api_manager,
gcp_service_account=xds_k8s_flags.GCP_SERVICE_ACCOUNT.value,
network=xds_flags.NETWORK.value,
reuse_namespace=reuse_namespace,
reuse_service=reuse_service,
debug_use_port_forwarding=port_forwarding)
if secure:
runner_kwargs.update(
xds_server_uri=xds_flags.XDS_SERVER_URI.value,
deployment_template='server-secure.deployment.yaml')
return KubernetesServerRunner(namespace, **runner_kwargs)
def _ensure_atexit(signum, frame):
"""Needed to handle signals or atexit handler won't be called."""
del frame
# Pylint is wrong about "Module 'signal' has no 'Signals' member":
# https://docs.python.org/3/library/signal.html#signal.Signals
sig = signal.Signals(signum) # pylint: disable=no-member
logger.warning('Caught %r, initiating graceful shutdown...\n', sig)
sys.exit(1)
def _graceful_exit(server_runner: KubernetesServerRunner,
client_runner: KubernetesClientRunner):
"""Stop port forwarding processes."""
client_runner.stop_pod_dependencies()
server_runner.stop_pod_dependencies()
def register_graceful_exit(server_runner: KubernetesServerRunner,
client_runner: KubernetesClientRunner):
atexit.register(_graceful_exit, server_runner, client_runner)
for signum in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT):
signal.signal(signum, _ensure_atexit)
def get_client_pod(client_runner: KubernetesClientRunner,
deployment_name: str) -> k8s.V1Pod:
client_deployment: k8s.V1Deployment
client_deployment = client_runner.k8s_namespace.get_deployment(
deployment_name)
client_pod_name: str = client_runner._wait_deployment_pod_count(
client_deployment)[0]
return client_runner._wait_pod_started(client_pod_name)
def get_server_pod(server_runner: KubernetesServerRunner,
deployment_name: str) -> k8s.V1Pod:
server_deployment: k8s.V1Deployment
server_deployment = server_runner.k8s_namespace.get_deployment(
deployment_name)
server_pod_name: str = server_runner._wait_deployment_pod_count(
server_deployment)[0]
return server_runner._wait_pod_started(server_pod_name)
def get_test_server_for_pod(server_runner: KubernetesServerRunner,
server_pod: k8s.V1Pod, **kwargs) -> _XdsTestServer:
return server_runner._xds_test_server_for_pod(server_pod, **kwargs)
def get_test_client_for_pod(client_runner: KubernetesClientRunner,
client_pod: k8s.V1Pod, **kwargs) -> _XdsTestClient:
return client_runner._xds_test_client_for_pod(client_pod, **kwargs)

@ -29,26 +29,21 @@ Typical usage examples:
python -m bin.run_channelz --helpfull
"""
import hashlib
import logging
from absl import app
from absl import flags
from absl import logging
from bin.lib import common
from framework import xds_flags
from framework import xds_k8s_flags
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.rpc import grpc_channelz
from framework.test_app import client_app
from framework.test_app import server_app
logger = logging.getLogger(__name__)
# Flags
_SERVER_RPC_HOST = flags.DEFINE_string('server_rpc_host',
default='127.0.0.1',
help='Server RPC host')
_CLIENT_RPC_HOST = flags.DEFINE_string('client_rpc_host',
default='127.0.0.1',
help='Client RPC host')
_SECURITY = flags.DEFINE_enum('security',
default=None,
enum_values=[
@ -59,7 +54,13 @@ _SECURITY = flags.DEFINE_enum('security',
flags.adopt_module_key_flags(xds_flags)
flags.adopt_module_key_flags(xds_k8s_flags)
# Running outside of a test suite, so require explicit resource_suffix.
flags.mark_flag_as_required("resource_suffix")
flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name)
flags.register_validator(xds_flags.SERVER_XDS_PORT.name,
lambda val: val > 0,
message="Run outside of a test suite, must provide"
" the exact port value (must be greater than 0).")
logger = logging.get_absl_logger()
# Type aliases
_Channel = grpc_channelz.Channel
@ -165,8 +166,8 @@ def debug_basic_setup(test_client, test_server):
server_sock: _Socket = test_server.get_server_socket_matching_client(
client_sock)
print(f'Client socket:\n{client_sock}\n')
print(f'Matching server:\n{server_sock}\n')
logger.debug('Client socket: %s\n', client_sock)
logger.debug('Matching server socket: %s\n', server_sock)
def main(argv):
@ -176,45 +177,61 @@ def main(argv):
# Must be called before KubernetesApiManager or GcpApiManager init.
xds_flags.set_socket_default_timeout_from_flag()
# Flags.
should_port_forward: bool = xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value
is_secure: bool = bool(_SECURITY.value)
# Setup.
gcp_api_manager = gcp.api.GcpApiManager()
k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value)
# Resource names.
resource_prefix: str = xds_flags.RESOURCE_PREFIX.value
# Server
server_name = xds_flags.SERVER_NAME.value
server_namespace = resource_prefix
server_k8s_ns = k8s.KubernetesNamespace(k8s_api_manager, server_namespace)
server_pod = get_deployment_pods(server_k8s_ns, server_name)[0]
test_server: _XdsTestServer = _XdsTestServer(
ip=server_pod.status.pod_ip,
rpc_port=xds_flags.SERVER_PORT.value,
hostname=server_pod.metadata.name,
xds_host=xds_flags.SERVER_XDS_HOST.value,
xds_port=xds_flags.SERVER_XDS_PORT.value,
rpc_host=_SERVER_RPC_HOST.value)
# Server.
server_namespace = common.make_server_namespace(k8s_api_manager)
server_runner = common.make_server_runner(
server_namespace,
gcp_api_manager,
port_forwarding=should_port_forward,
secure=is_secure)
# Find server pod.
server_pod: k8s.V1Pod = common.get_server_pod(server_runner,
xds_flags.SERVER_NAME.value)
# Client
client_name = xds_flags.CLIENT_NAME.value
client_namespace = resource_prefix
client_k8s_ns = k8s.KubernetesNamespace(k8s_api_manager, client_namespace)
client_pod = get_deployment_pods(client_k8s_ns, client_name)[0]
test_client: _XdsTestClient = _XdsTestClient(
ip=client_pod.status.pod_ip,
rpc_port=xds_flags.CLIENT_PORT.value,
server_target=test_server.xds_uri,
hostname=client_pod.metadata.name,
rpc_host=_CLIENT_RPC_HOST.value)
if _SECURITY.value in ('mtls', 'tls', 'plaintext'):
debug_security_setup_positive(test_client, test_server)
elif _SECURITY.value == ('mtls_error', 'server_authz_error'):
debug_security_setup_negative(test_client)
else:
debug_basic_setup(test_client, test_server)
test_client.close()
test_server.close()
client_namespace = common.make_client_namespace(k8s_api_manager)
client_runner = common.make_client_runner(
client_namespace,
gcp_api_manager,
port_forwarding=should_port_forward,
secure=is_secure)
# Find client pod.
client_pod: k8s.V1Pod = common.get_client_pod(client_runner,
xds_flags.CLIENT_NAME.value)
# Ensure port forwarding stopped.
common.register_graceful_exit(server_runner, client_runner)
# Create server app for the server pod.
test_server: _XdsTestServer = common.get_test_server_for_pod(
server_runner,
server_pod,
test_port=xds_flags.SERVER_PORT.value,
secure_mode=is_secure)
test_server.set_xds_address(xds_flags.SERVER_XDS_HOST.value,
xds_flags.SERVER_XDS_PORT.value)
# Create client app for the client pod.
test_client: _XdsTestClient = common.get_test_client_for_pod(
client_runner, client_pod, server_target=test_server.xds_uri)
with test_client, test_server:
if _SECURITY.value in ('mtls', 'tls', 'plaintext'):
debug_security_setup_positive(test_client, test_server)
elif _SECURITY.value in ('mtls_error', 'server_authz_error'):
debug_security_setup_negative(test_client)
else:
debug_basic_setup(test_client, test_server)
logger.info('SUCCESS!')
if __name__ == '__main__':

@ -0,0 +1,141 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from absl import app
from absl import flags
from absl import logging
from bin.lib import common
from framework import xds_flags
from framework import xds_k8s_flags
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.rpc import grpc_channelz
from framework.rpc import grpc_testing
from framework.test_app import client_app
from framework.test_app import server_app
# Flags
_SECURE = flags.DEFINE_bool(
"secure",
default=False,
help="Set to True if the the client/server were started "
"with the PSM security enabled.")
_NUM_RPCS = flags.DEFINE_integer("num_rpcs",
default=100,
lower_bound=1,
upper_bound=10_000,
help="The number of RPCs to check.")
flags.adopt_module_key_flags(xds_flags)
flags.adopt_module_key_flags(xds_k8s_flags)
# Running outside of a test suite, so require explicit resource_suffix.
flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name)
flags.register_validator(xds_flags.SERVER_XDS_PORT.name,
lambda val: val > 0,
message="Run outside of a test suite, must provide"
" the exact port value (must be greater than 0).")
logger = logging.get_absl_logger()
# Type aliases
_Channel = grpc_channelz.Channel
_Socket = grpc_channelz.Socket
_ChannelState = grpc_channelz.ChannelState
_XdsTestServer = server_app.XdsTestServer
_XdsTestClient = client_app.XdsTestClient
LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse
def get_client_rpc_stats(test_client: _XdsTestClient,
num_rpcs: int) -> LoadBalancerStatsResponse:
lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs)
logger.info('Received LoadBalancerStatsResponse from test client %s:\n%s',
test_client.hostname, lb_stats)
return lb_stats
def run_ping_pong(test_client: _XdsTestClient, num_rpcs: int):
test_client.wait_for_active_server_channel()
lb_stats = get_client_rpc_stats(test_client, num_rpcs)
for backend, rpcs_count in lb_stats.rpcs_by_peer.items():
if int(rpcs_count) < 1:
raise AssertionError(
f'Backend {backend} did not receive a single RPC')
failed = int(lb_stats.num_failures)
if int(lb_stats.num_failures) > 0:
raise AssertionError(
f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed')
def main(argv):
if len(argv) > 1:
raise app.UsageError('Too many command-line arguments.')
# Must be called before KubernetesApiManager or GcpApiManager init.
xds_flags.set_socket_default_timeout_from_flag()
# Flags.
should_port_forward: bool = xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value
is_secure: bool = _SECURE.value
# Setup.
gcp_api_manager = gcp.api.GcpApiManager()
k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value)
# Server.
server_namespace = common.make_server_namespace(k8s_api_manager)
server_runner = common.make_server_runner(
server_namespace,
gcp_api_manager,
port_forwarding=should_port_forward,
secure=is_secure)
# Find server pod.
server_pod: k8s.V1Pod = common.get_server_pod(server_runner,
xds_flags.SERVER_NAME.value)
# Client
client_namespace = common.make_client_namespace(k8s_api_manager)
client_runner = common.make_client_runner(
client_namespace,
gcp_api_manager,
port_forwarding=should_port_forward,
secure=is_secure)
# Find client pod.
client_pod: k8s.V1Pod = common.get_client_pod(client_runner,
xds_flags.CLIENT_NAME.value)
# Ensure port forwarding stopped.
common.register_graceful_exit(server_runner, client_runner)
# Create server app for the server pod.
test_server: _XdsTestServer = common.get_test_server_for_pod(
server_runner,
server_pod,
test_port=xds_flags.SERVER_PORT.value,
secure_mode=is_secure)
test_server.set_xds_address(xds_flags.SERVER_XDS_HOST.value,
xds_flags.SERVER_XDS_PORT.value)
# Create client app for the client pod.
test_client: _XdsTestClient = common.get_test_client_for_pod(
client_runner, client_pod, server_target=test_server.xds_uri)
with test_client, test_server:
run_ping_pong(test_client, _NUM_RPCS.value)
logger.info('SUCCESS!')
if __name__ == '__main__':
app.run(main)

@ -62,7 +62,17 @@ _SECURITY = flags.DEFINE_enum('security',
flags.adopt_module_key_flags(xds_flags)
flags.adopt_module_key_flags(xds_k8s_flags)
# Running outside of a test suite, so require explicit resource_suffix.
flags.mark_flag_as_required("resource_suffix")
flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name)
@flags.multi_flags_validator((xds_flags.SERVER_XDS_PORT.name, _CMD.name),
message="Run outside of a test suite, must provide"
" the exact port value (must be greater than 0).")
def _check_server_xds_port_flag(flags_dict):
if flags_dict[_CMD.name] not in ('create', 'cycle'):
return True
return flags_dict[xds_flags.SERVER_XDS_PORT.name] > 0
# Type aliases
_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner

@ -17,11 +17,11 @@ import signal
from absl import app
from absl import flags
from bin.lib import common
from framework import xds_flags
from framework import xds_k8s_flags
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app.runners.k8s import k8s_xds_client_runner
logger = logging.getLogger(__name__)
# Flags
@ -36,10 +36,12 @@ _QPS = flags.DEFINE_integer('qps', default=25, help='Queries per second')
_PRINT_RESPONSE = flags.DEFINE_bool("print_response",
default=False,
help="Client prints responses")
_FOLLOW = flags.DEFINE_bool("follow",
default=False,
help="Follow pod logs. "
"Requires --collect_app_logs")
_FOLLOW = flags.DEFINE_bool(
"follow",
default=False,
help=
"Follow pod logs. Requires --collect_app_logs or --debug_use_port_forwarding"
)
_CONFIG_MESH = flags.DEFINE_bool(
"config_mesh",
default=None,
@ -54,13 +56,19 @@ _CLEANUP_NAMESPACE = flags.DEFINE_bool(
flags.adopt_module_key_flags(xds_flags)
flags.adopt_module_key_flags(xds_k8s_flags)
# Running outside of a test suite, so require explicit resource_suffix.
flags.mark_flag_as_required("resource_suffix")
flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name)
# Type aliases
_KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
@flags.multi_flags_validator((xds_flags.SERVER_XDS_PORT.name, _CMD.name),
message="Run outside of a test suite, must provide"
" the exact port value (must be greater than 0).")
def _check_server_xds_port_flag(flags_dict):
if flags_dict[_CMD.name] == "cleanup":
return True
return flags_dict[xds_flags.SERVER_XDS_PORT.name] > 0
def make_sigint_handler(client_runner: _KubernetesClientRunner):
def _make_sigint_handler(client_runner: common.KubernetesClientRunner):
def sigint_handler(sig, frame):
del sig, frame
@ -77,33 +85,21 @@ def main(argv):
# Must be called before KubernetesApiManager or GcpApiManager init.
xds_flags.set_socket_default_timeout_from_flag()
project: str = xds_flags.PROJECT.value
# GCP Service Account email
gcp_service_account: str = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value
# KubernetesClientRunner arguments.
runner_kwargs = dict(
deployment_name=xds_flags.CLIENT_NAME.value,
image_name=xds_k8s_flags.CLIENT_IMAGE.value,
td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value,
gcp_project=project,
gcp_api_manager=gcp.api.GcpApiManager(),
gcp_service_account=gcp_service_account,
xds_server_uri=xds_flags.XDS_SERVER_URI.value,
network=xds_flags.NETWORK.value,
stats_port=xds_flags.CLIENT_PORT.value,
reuse_namespace=_REUSE_NAMESPACE.value)
if _SECURE.value:
runner_kwargs.update(
deployment_template='client-secure.deployment.yaml')
# Log following and port forwarding.
should_follow_logs = _FOLLOW.value and xds_flags.COLLECT_APP_LOGS.value
should_port_forward = (should_follow_logs and
xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value)
# Setup.
gcp_api_manager = gcp.api.GcpApiManager()
k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value)
client_namespace = _KubernetesClientRunner.make_namespace_name(
xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value)
client_runner = _KubernetesClientRunner(
k8s.KubernetesNamespace(k8s_api_manager, client_namespace),
**runner_kwargs)
client_namespace = common.make_client_namespace(k8s_api_manager)
client_runner = common.make_client_runner(
client_namespace,
gcp_api_manager,
reuse_namespace=_REUSE_NAMESPACE.value,
secure=_SECURE.value,
port_forwarding=should_port_forward)
# Server target
server_xds_host = xds_flags.SERVER_XDS_HOST.value
@ -118,9 +114,9 @@ def main(argv):
secure_mode=_SECURE.value,
config_mesh=_CONFIG_MESH.value,
log_to_stdout=_FOLLOW.value)
if client_runner.should_collect_logs and _FOLLOW.value:
if should_follow_logs:
print('Following pod logs. Press Ctrl+C top stop')
signal.signal(signal.SIGINT, make_sigint_handler(client_runner))
signal.signal(signal.SIGINT, _make_sigint_handler(client_runner))
signal.pause()
elif _CMD.value == 'cleanup':

@ -17,11 +17,11 @@ import signal
from absl import app
from absl import flags
from bin.lib import common
from framework import xds_flags
from framework import xds_k8s_flags
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app.runners.k8s import k8s_xds_server_runner
logger = logging.getLogger(__name__)
# Flags
@ -51,11 +51,8 @@ flags.adopt_module_key_flags(xds_k8s_flags)
# Running outside of a test suite, so require explicit resource_suffix.
flags.mark_flag_as_required("resource_suffix")
# Type aliases
_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
def make_sigint_handler(server_runner: _KubernetesServerRunner):
def _make_sigint_handler(server_runner: common.KubernetesServerRunner):
def sigint_handler(sig, frame):
del sig, frame
@ -72,37 +69,21 @@ def main(argv):
# Must be called before KubernetesApiManager or GcpApiManager init.
xds_flags.set_socket_default_timeout_from_flag()
project: str = xds_flags.PROJECT.value
# GCP Service Account email
gcp_service_account: str = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value
# Resource names.
resource_prefix: str = xds_flags.RESOURCE_PREFIX.value
resource_suffix: str = xds_flags.RESOURCE_SUFFIX.value
# KubernetesServerRunner arguments.
runner_kwargs = dict(
deployment_name=xds_flags.SERVER_NAME.value,
image_name=xds_k8s_flags.SERVER_IMAGE.value,
td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value,
gcp_project=project,
gcp_api_manager=gcp.api.GcpApiManager(),
gcp_service_account=gcp_service_account,
network=xds_flags.NETWORK.value,
reuse_namespace=_REUSE_NAMESPACE.value,
reuse_service=_REUSE_SERVICE.value)
if _SECURE.value:
runner_kwargs.update(
xds_server_uri=xds_flags.XDS_SERVER_URI.value,
deployment_template='server-secure.deployment.yaml')
should_follow_logs = _FOLLOW.value and xds_flags.COLLECT_APP_LOGS.value
should_port_forward = (should_follow_logs and
xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value)
# Setup.
gcp_api_manager = gcp.api.GcpApiManager()
k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value)
server_namespace = _KubernetesServerRunner.make_namespace_name(
resource_prefix, resource_suffix)
server_runner = _KubernetesServerRunner(
k8s.KubernetesNamespace(k8s_api_manager, server_namespace),
**runner_kwargs)
server_namespace = common.make_server_namespace(k8s_api_manager)
server_runner = common.make_server_runner(
server_namespace,
gcp_api_manager,
reuse_namespace=_REUSE_NAMESPACE.value,
reuse_service=_REUSE_SERVICE.value,
secure=_SECURE.value,
port_forwarding=should_port_forward)
if _CMD.value == 'run':
logger.info('Run server, secure_mode=%s', _SECURE.value)
@ -111,9 +92,9 @@ def main(argv):
maintenance_port=xds_flags.SERVER_MAINTENANCE_PORT.value,
secure_mode=_SECURE.value,
log_to_stdout=_FOLLOW.value)
if server_runner.should_collect_logs and _FOLLOW.value:
if should_follow_logs:
print('Following pod logs. Press Ctrl+C top stop')
signal.signal(signal.SIGINT, make_sigint_handler(server_runner))
signal.signal(signal.SIGINT, _make_sigint_handler(server_runner))
signal.pause()
elif _CMD.value == 'cleanup':

@ -149,6 +149,10 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
# Verify the deployment reports all pods started as well.
self._wait_deployment_with_available_replicas(self.deployment_name)
return self._xds_test_client_for_pod(pod, server_target=server_target)
def _xds_test_client_for_pod(self, pod: k8s.V1Pod, *,
server_target: str) -> XdsTestClient:
if self.debug_use_port_forwarding:
pf = self._start_port_forwarding_pod(pod, self.stats_port)
rpc_port, rpc_host = pf.local_port, pf.local_address
@ -158,7 +162,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
return XdsTestClient(ip=pod.status.pod_ip,
rpc_port=rpc_port,
server_target=server_target,
hostname=pod_name,
hostname=pod.metadata.name,
rpc_host=rpc_host)
def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ

@ -103,16 +103,13 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
secure_mode: bool = False,
replica_count: int = 1,
log_to_stdout: bool = False) -> List[XdsTestServer]:
if not maintenance_port:
maintenance_port = self._get_default_maintenance_port(secure_mode)
# Implementation detail: in secure mode, maintenance ("backchannel")
# port must be different from the test port so communication with
# maintenance services can be reached independently from the security
# maintenance services can be reached independently of the security
# configuration under test.
if maintenance_port is None:
if not secure_mode:
maintenance_port = self.DEFAULT_MAINTENANCE_PORT
else:
maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
if secure_mode and maintenance_port == test_port:
raise ValueError('port and maintenance_port must be different '
'when running test server in secure mode')
@ -193,23 +190,44 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
# Verify the deployment reports all pods started as well.
self._wait_deployment_with_available_replicas(self.deployment_name,
replica_count)
servers = []
servers: List[XdsTestServer] = []
for pod in pods:
if self.debug_use_port_forwarding:
pf = self._start_port_forwarding_pod(pod, maintenance_port)
rpc_port, rpc_host = pf.local_port, pf.local_address
else:
rpc_port, rpc_host = maintenance_port, None
servers.append(
XdsTestServer(ip=pod.status.pod_ip,
rpc_port=test_port,
hostname=pod.metadata.name,
maintenance_port=rpc_port,
secure_mode=secure_mode,
rpc_host=rpc_host))
self._xds_test_server_for_pod(pod,
test_port=test_port,
maintenance_port=maintenance_port,
secure_mode=secure_mode))
return servers
def _get_default_maintenance_port(self, secure_mode: bool) -> int:
if not secure_mode:
maintenance_port = self.DEFAULT_MAINTENANCE_PORT
else:
maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
return maintenance_port
def _xds_test_server_for_pod(self,
pod: k8s.V1Pod,
*,
test_port: int = DEFAULT_TEST_PORT,
maintenance_port: Optional[int] = None,
secure_mode: bool = False) -> XdsTestServer:
if maintenance_port is None:
maintenance_port = self._get_default_maintenance_port(secure_mode)
if self.debug_use_port_forwarding:
pf = self._start_port_forwarding_pod(pod, maintenance_port)
rpc_port, rpc_host = pf.local_port, pf.local_address
else:
rpc_port, rpc_host = maintenance_port, None
return XdsTestServer(ip=pod.status.pod_ip,
rpc_port=test_port,
hostname=pod.metadata.name,
maintenance_port=rpc_port,
secure_mode=secure_mode,
rpc_host=rpc_host)
def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ
if self.deployment or force:
self._delete_deployment(self.deployment_name)

Loading…
Cancel
Save