From c0ee9ff4d512426a8212e9bd7b10bb0b3bebb2a3 Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Wed, 26 Apr 2023 15:58:02 -0400 Subject: [PATCH] [PSM Interop] Various improvements to the helper scripts (#32745) - Fix broken `bin/run_channelz.py` helper - Create `bin/run_ping_pong.py` helper that runs the baseline (aka "ping_pong") test against preconfigured infra - Setup automatic port forwarding when running `bin/run_channelz.py` and `bin/run_ping_pong.py` - Create `bin/cleanup_cluster.sh` helper to wipe xds out resources based namespaces present on the cluster Note: this involves a small change to the non-helper code, but it's just moving a the part that makes XdsTestServer/XdsTestClient instance for a given pod. --- .../bin/cleanup_cluster.sh | 88 ++++++++++ .../xds_k8s_test_driver/bin/lib/__init__.py | 13 ++ .../xds_k8s_test_driver/bin/lib/common.py | 157 ++++++++++++++++++ .../xds_k8s_test_driver/bin/run_channelz.py | 109 +++++++----- .../xds_k8s_test_driver/bin/run_ping_pong.py | 141 ++++++++++++++++ .../xds_k8s_test_driver/bin/run_td_setup.py | 12 +- .../bin/run_test_client.py | 68 ++++---- .../bin/run_test_server.py | 53 ++---- .../runners/k8s/k8s_xds_client_runner.py | 6 +- .../runners/k8s/k8s_xds_server_runner.py | 58 ++++--- 10 files changed, 565 insertions(+), 140 deletions(-) create mode 100755 tools/run_tests/xds_k8s_test_driver/bin/cleanup_cluster.sh create mode 100644 tools/run_tests/xds_k8s_test_driver/bin/lib/__init__.py create mode 100755 tools/run_tests/xds_k8s_test_driver/bin/lib/common.py create mode 100755 tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py diff --git a/tools/run_tests/xds_k8s_test_driver/bin/cleanup_cluster.sh b/tools/run_tests/xds_k8s_test_driver/bin/cleanup_cluster.sh new file mode 100755 index 00000000000..0080d532a6a --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/bin/cleanup_cluster.sh @@ -0,0 +1,88 @@ +#!/usr/bin/env bash +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eo pipefail + +SCRIPT_DIR="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" +readonly SCRIPT_DIR +readonly XDS_K8S_DRIVER_DIR="${SCRIPT_DIR}/.." + +cd "${XDS_K8S_DRIVER_DIR}" + +NO_SECURE="yes" +DATE_TO=$(date -Iseconds) + +while [[ $# -gt 0 ]]; do + case $1 in + --secure) NO_SECURE=""; shift ;; + --date_to=*) DATE_TO="${1#*=}T00:00:00Z"; shift ;; + *) echo "Unknown argument $1"; exit 1 ;; + esac +done + +jq_selector=$(cat <<- 'EOM' + .items[].metadata | + select( + (.name | test("-(client|server)-")) and + (.creationTimestamp < $date_to) + ) | .name +EOM +) + +mapfile -t namespaces < <(\ + kubectl get namespaces --sort-by='{.metadata.creationTimestamp}'\ + --selector='owner=xds-k8s-interop-test'\ + -o json\ + | jq --arg date_to "${DATE_TO}" -r "${jq_selector}" +) + +if [[ -z "${namespaces[*]}" ]]; then + echo "All clean." + exit 0 +fi + +echo "Found namespaces:" +namespaces_joined=$(IFS=,; printf '%s' "${namespaces[*]}") +kubectl get namespaces --sort-by='{.metadata.creationTimestamp}' \ + --selector="name in (${namespaces_joined})" +# Suffixes +mapfile -t suffixes < <(\ + printf '%s\n' "${namespaces[@]}" | sed -E 's/psm-interop-(server|client)-//' +) +echo +echo "Found suffixes: ${suffixes[*]}" + +echo "Run plan:" +for suffix in "${suffixes[@]}"; do + echo ./bin/cleanup.sh ${NO_SECURE:+"--nosecure"} "--resource_suffix=${suffix}" +done + +read -r -n 1 -p "Continue? (y/N) " answer +if [[ "$answer" != "${answer#[Yy]}" ]] ;then + echo + echo "Starting the cleanup." +else + echo + echo "Exit" + exit 0 +fi + +for suffix in "${suffixes[@]}"; do + echo "-------------------- Cleaning suffix ${suffix} --------------------" + set -x + ./bin/cleanup.sh ${NO_SECURE:+"--nosecure"} "--resource_suffix=${suffix}" + set +x + echo "-------------------- Finished cleaning ${suffix} --------------------" +done diff --git a/tools/run_tests/xds_k8s_test_driver/bin/lib/__init__.py b/tools/run_tests/xds_k8s_test_driver/bin/lib/__init__.py new file mode 100644 index 00000000000..d921d237a32 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/bin/lib/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tools/run_tests/xds_k8s_test_driver/bin/lib/common.py b/tools/run_tests/xds_k8s_test_driver/bin/lib/common.py new file mode 100755 index 00000000000..0c38a39d23a --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/bin/lib/common.py @@ -0,0 +1,157 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Common functionality for bin/ python helpers.""" +import atexit +import signal +import sys + +from absl import logging + +from framework import xds_flags +from framework import xds_k8s_flags +from framework.infrastructure import gcp +from framework.infrastructure import k8s +from framework.test_app import client_app +from framework.test_app import server_app +from framework.test_app.runners.k8s import k8s_xds_client_runner +from framework.test_app.runners.k8s import k8s_xds_server_runner + +logger = logging.get_absl_logger() + +# Type aliases +KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner +KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner +_XdsTestServer = server_app.XdsTestServer +_XdsTestClient = client_app.XdsTestClient + + +def make_client_namespace( + k8s_api_manager: k8s.KubernetesApiManager) -> k8s.KubernetesNamespace: + namespace_name: str = KubernetesClientRunner.make_namespace_name( + xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value) + return k8s.KubernetesNamespace(k8s_api_manager, namespace_name) + + +def make_client_runner(namespace: k8s.KubernetesNamespace, + gcp_api_manager: gcp.api.GcpApiManager, + port_forwarding: bool = False, + reuse_namespace: bool = True, + secure: bool = False) -> KubernetesClientRunner: + # KubernetesClientRunner arguments. + runner_kwargs = dict( + deployment_name=xds_flags.CLIENT_NAME.value, + image_name=xds_k8s_flags.CLIENT_IMAGE.value, + td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value, + gcp_project=xds_flags.PROJECT.value, + gcp_api_manager=gcp_api_manager, + gcp_service_account=xds_k8s_flags.GCP_SERVICE_ACCOUNT.value, + xds_server_uri=xds_flags.XDS_SERVER_URI.value, + network=xds_flags.NETWORK.value, + stats_port=xds_flags.CLIENT_PORT.value, + reuse_namespace=reuse_namespace, + debug_use_port_forwarding=port_forwarding) + + if secure: + runner_kwargs.update( + deployment_template='client-secure.deployment.yaml') + return KubernetesClientRunner(namespace, **runner_kwargs) + + +def make_server_namespace( + k8s_api_manager: k8s.KubernetesApiManager) -> k8s.KubernetesNamespace: + namespace_name: str = KubernetesServerRunner.make_namespace_name( + xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value) + return k8s.KubernetesNamespace(k8s_api_manager, namespace_name) + + +def make_server_runner(namespace: k8s.KubernetesNamespace, + gcp_api_manager: gcp.api.GcpApiManager, + port_forwarding: bool = False, + reuse_namespace: bool = True, + reuse_service: bool = False, + secure: bool = False) -> KubernetesServerRunner: + # KubernetesServerRunner arguments. + runner_kwargs = dict( + deployment_name=xds_flags.SERVER_NAME.value, + image_name=xds_k8s_flags.SERVER_IMAGE.value, + td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value, + gcp_project=xds_flags.PROJECT.value, + gcp_api_manager=gcp_api_manager, + gcp_service_account=xds_k8s_flags.GCP_SERVICE_ACCOUNT.value, + network=xds_flags.NETWORK.value, + reuse_namespace=reuse_namespace, + reuse_service=reuse_service, + debug_use_port_forwarding=port_forwarding) + + if secure: + runner_kwargs.update( + xds_server_uri=xds_flags.XDS_SERVER_URI.value, + deployment_template='server-secure.deployment.yaml') + + return KubernetesServerRunner(namespace, **runner_kwargs) + + +def _ensure_atexit(signum, frame): + """Needed to handle signals or atexit handler won't be called.""" + del frame + + # Pylint is wrong about "Module 'signal' has no 'Signals' member": + # https://docs.python.org/3/library/signal.html#signal.Signals + sig = signal.Signals(signum) # pylint: disable=no-member + logger.warning('Caught %r, initiating graceful shutdown...\n', sig) + sys.exit(1) + + +def _graceful_exit(server_runner: KubernetesServerRunner, + client_runner: KubernetesClientRunner): + """Stop port forwarding processes.""" + client_runner.stop_pod_dependencies() + server_runner.stop_pod_dependencies() + + +def register_graceful_exit(server_runner: KubernetesServerRunner, + client_runner: KubernetesClientRunner): + atexit.register(_graceful_exit, server_runner, client_runner) + for signum in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT): + signal.signal(signum, _ensure_atexit) + + +def get_client_pod(client_runner: KubernetesClientRunner, + deployment_name: str) -> k8s.V1Pod: + client_deployment: k8s.V1Deployment + client_deployment = client_runner.k8s_namespace.get_deployment( + deployment_name) + client_pod_name: str = client_runner._wait_deployment_pod_count( + client_deployment)[0] + return client_runner._wait_pod_started(client_pod_name) + + +def get_server_pod(server_runner: KubernetesServerRunner, + deployment_name: str) -> k8s.V1Pod: + server_deployment: k8s.V1Deployment + server_deployment = server_runner.k8s_namespace.get_deployment( + deployment_name) + server_pod_name: str = server_runner._wait_deployment_pod_count( + server_deployment)[0] + return server_runner._wait_pod_started(server_pod_name) + + +def get_test_server_for_pod(server_runner: KubernetesServerRunner, + server_pod: k8s.V1Pod, **kwargs) -> _XdsTestServer: + return server_runner._xds_test_server_for_pod(server_pod, **kwargs) + + +def get_test_client_for_pod(client_runner: KubernetesClientRunner, + client_pod: k8s.V1Pod, **kwargs) -> _XdsTestClient: + return client_runner._xds_test_client_for_pod(client_pod, **kwargs) diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py b/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py index 290f6aa2c3b..79747be06a2 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py @@ -29,26 +29,21 @@ Typical usage examples: python -m bin.run_channelz --helpfull """ import hashlib -import logging from absl import app from absl import flags +from absl import logging +from bin.lib import common from framework import xds_flags from framework import xds_k8s_flags +from framework.infrastructure import gcp from framework.infrastructure import k8s from framework.rpc import grpc_channelz from framework.test_app import client_app from framework.test_app import server_app -logger = logging.getLogger(__name__) # Flags -_SERVER_RPC_HOST = flags.DEFINE_string('server_rpc_host', - default='127.0.0.1', - help='Server RPC host') -_CLIENT_RPC_HOST = flags.DEFINE_string('client_rpc_host', - default='127.0.0.1', - help='Client RPC host') _SECURITY = flags.DEFINE_enum('security', default=None, enum_values=[ @@ -59,7 +54,13 @@ _SECURITY = flags.DEFINE_enum('security', flags.adopt_module_key_flags(xds_flags) flags.adopt_module_key_flags(xds_k8s_flags) # Running outside of a test suite, so require explicit resource_suffix. -flags.mark_flag_as_required("resource_suffix") +flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name) +flags.register_validator(xds_flags.SERVER_XDS_PORT.name, + lambda val: val > 0, + message="Run outside of a test suite, must provide" + " the exact port value (must be greater than 0).") + +logger = logging.get_absl_logger() # Type aliases _Channel = grpc_channelz.Channel @@ -165,8 +166,8 @@ def debug_basic_setup(test_client, test_server): server_sock: _Socket = test_server.get_server_socket_matching_client( client_sock) - print(f'Client socket:\n{client_sock}\n') - print(f'Matching server:\n{server_sock}\n') + logger.debug('Client socket: %s\n', client_sock) + logger.debug('Matching server socket: %s\n', server_sock) def main(argv): @@ -176,45 +177,61 @@ def main(argv): # Must be called before KubernetesApiManager or GcpApiManager init. xds_flags.set_socket_default_timeout_from_flag() + # Flags. + should_port_forward: bool = xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value + is_secure: bool = bool(_SECURITY.value) + + # Setup. + gcp_api_manager = gcp.api.GcpApiManager() k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value) - # Resource names. - resource_prefix: str = xds_flags.RESOURCE_PREFIX.value - - # Server - server_name = xds_flags.SERVER_NAME.value - server_namespace = resource_prefix - server_k8s_ns = k8s.KubernetesNamespace(k8s_api_manager, server_namespace) - server_pod = get_deployment_pods(server_k8s_ns, server_name)[0] - test_server: _XdsTestServer = _XdsTestServer( - ip=server_pod.status.pod_ip, - rpc_port=xds_flags.SERVER_PORT.value, - hostname=server_pod.metadata.name, - xds_host=xds_flags.SERVER_XDS_HOST.value, - xds_port=xds_flags.SERVER_XDS_PORT.value, - rpc_host=_SERVER_RPC_HOST.value) + # Server. + server_namespace = common.make_server_namespace(k8s_api_manager) + server_runner = common.make_server_runner( + server_namespace, + gcp_api_manager, + port_forwarding=should_port_forward, + secure=is_secure) + # Find server pod. + server_pod: k8s.V1Pod = common.get_server_pod(server_runner, + xds_flags.SERVER_NAME.value) # Client - client_name = xds_flags.CLIENT_NAME.value - client_namespace = resource_prefix - client_k8s_ns = k8s.KubernetesNamespace(k8s_api_manager, client_namespace) - client_pod = get_deployment_pods(client_k8s_ns, client_name)[0] - test_client: _XdsTestClient = _XdsTestClient( - ip=client_pod.status.pod_ip, - rpc_port=xds_flags.CLIENT_PORT.value, - server_target=test_server.xds_uri, - hostname=client_pod.metadata.name, - rpc_host=_CLIENT_RPC_HOST.value) - - if _SECURITY.value in ('mtls', 'tls', 'plaintext'): - debug_security_setup_positive(test_client, test_server) - elif _SECURITY.value == ('mtls_error', 'server_authz_error'): - debug_security_setup_negative(test_client) - else: - debug_basic_setup(test_client, test_server) - - test_client.close() - test_server.close() + client_namespace = common.make_client_namespace(k8s_api_manager) + client_runner = common.make_client_runner( + client_namespace, + gcp_api_manager, + port_forwarding=should_port_forward, + secure=is_secure) + # Find client pod. + client_pod: k8s.V1Pod = common.get_client_pod(client_runner, + xds_flags.CLIENT_NAME.value) + + # Ensure port forwarding stopped. + common.register_graceful_exit(server_runner, client_runner) + + # Create server app for the server pod. + test_server: _XdsTestServer = common.get_test_server_for_pod( + server_runner, + server_pod, + test_port=xds_flags.SERVER_PORT.value, + secure_mode=is_secure) + test_server.set_xds_address(xds_flags.SERVER_XDS_HOST.value, + xds_flags.SERVER_XDS_PORT.value) + + # Create client app for the client pod. + test_client: _XdsTestClient = common.get_test_client_for_pod( + client_runner, client_pod, server_target=test_server.xds_uri) + + with test_client, test_server: + if _SECURITY.value in ('mtls', 'tls', 'plaintext'): + debug_security_setup_positive(test_client, test_server) + elif _SECURITY.value in ('mtls_error', 'server_authz_error'): + debug_security_setup_negative(test_client) + else: + debug_basic_setup(test_client, test_server) + + logger.info('SUCCESS!') if __name__ == '__main__': diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py b/tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py new file mode 100755 index 00000000000..906e07e2b54 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py @@ -0,0 +1,141 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from absl import app +from absl import flags +from absl import logging + +from bin.lib import common +from framework import xds_flags +from framework import xds_k8s_flags +from framework.infrastructure import gcp +from framework.infrastructure import k8s +from framework.rpc import grpc_channelz +from framework.rpc import grpc_testing +from framework.test_app import client_app +from framework.test_app import server_app + +# Flags +_SECURE = flags.DEFINE_bool( + "secure", + default=False, + help="Set to True if the the client/server were started " + "with the PSM security enabled.") +_NUM_RPCS = flags.DEFINE_integer("num_rpcs", + default=100, + lower_bound=1, + upper_bound=10_000, + help="The number of RPCs to check.") +flags.adopt_module_key_flags(xds_flags) +flags.adopt_module_key_flags(xds_k8s_flags) +# Running outside of a test suite, so require explicit resource_suffix. +flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name) +flags.register_validator(xds_flags.SERVER_XDS_PORT.name, + lambda val: val > 0, + message="Run outside of a test suite, must provide" + " the exact port value (must be greater than 0).") + +logger = logging.get_absl_logger() + +# Type aliases +_Channel = grpc_channelz.Channel +_Socket = grpc_channelz.Socket +_ChannelState = grpc_channelz.ChannelState +_XdsTestServer = server_app.XdsTestServer +_XdsTestClient = client_app.XdsTestClient +LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse + + +def get_client_rpc_stats(test_client: _XdsTestClient, + num_rpcs: int) -> LoadBalancerStatsResponse: + lb_stats = test_client.get_load_balancer_stats(num_rpcs=num_rpcs) + logger.info('Received LoadBalancerStatsResponse from test client %s:\n%s', + test_client.hostname, lb_stats) + return lb_stats + + +def run_ping_pong(test_client: _XdsTestClient, num_rpcs: int): + test_client.wait_for_active_server_channel() + lb_stats = get_client_rpc_stats(test_client, num_rpcs) + for backend, rpcs_count in lb_stats.rpcs_by_peer.items(): + if int(rpcs_count) < 1: + raise AssertionError( + f'Backend {backend} did not receive a single RPC') + + failed = int(lb_stats.num_failures) + if int(lb_stats.num_failures) > 0: + raise AssertionError( + f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed') + + +def main(argv): + if len(argv) > 1: + raise app.UsageError('Too many command-line arguments.') + + # Must be called before KubernetesApiManager or GcpApiManager init. + xds_flags.set_socket_default_timeout_from_flag() + + # Flags. + should_port_forward: bool = xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value + is_secure: bool = _SECURE.value + + # Setup. + gcp_api_manager = gcp.api.GcpApiManager() + k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value) + + # Server. + server_namespace = common.make_server_namespace(k8s_api_manager) + server_runner = common.make_server_runner( + server_namespace, + gcp_api_manager, + port_forwarding=should_port_forward, + secure=is_secure) + # Find server pod. + server_pod: k8s.V1Pod = common.get_server_pod(server_runner, + xds_flags.SERVER_NAME.value) + + # Client + client_namespace = common.make_client_namespace(k8s_api_manager) + client_runner = common.make_client_runner( + client_namespace, + gcp_api_manager, + port_forwarding=should_port_forward, + secure=is_secure) + # Find client pod. + client_pod: k8s.V1Pod = common.get_client_pod(client_runner, + xds_flags.CLIENT_NAME.value) + + # Ensure port forwarding stopped. + common.register_graceful_exit(server_runner, client_runner) + + # Create server app for the server pod. + test_server: _XdsTestServer = common.get_test_server_for_pod( + server_runner, + server_pod, + test_port=xds_flags.SERVER_PORT.value, + secure_mode=is_secure) + test_server.set_xds_address(xds_flags.SERVER_XDS_HOST.value, + xds_flags.SERVER_XDS_PORT.value) + + # Create client app for the client pod. + test_client: _XdsTestClient = common.get_test_client_for_pod( + client_runner, client_pod, server_target=test_server.xds_uri) + + with test_client, test_server: + run_ping_pong(test_client, _NUM_RPCS.value) + + logger.info('SUCCESS!') + + +if __name__ == '__main__': + app.run(main) diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py b/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py index c315db6c18f..88e617ae983 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py @@ -62,7 +62,17 @@ _SECURITY = flags.DEFINE_enum('security', flags.adopt_module_key_flags(xds_flags) flags.adopt_module_key_flags(xds_k8s_flags) # Running outside of a test suite, so require explicit resource_suffix. -flags.mark_flag_as_required("resource_suffix") +flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name) + + +@flags.multi_flags_validator((xds_flags.SERVER_XDS_PORT.name, _CMD.name), + message="Run outside of a test suite, must provide" + " the exact port value (must be greater than 0).") +def _check_server_xds_port_flag(flags_dict): + if flags_dict[_CMD.name] not in ('create', 'cycle'): + return True + return flags_dict[xds_flags.SERVER_XDS_PORT.name] > 0 + # Type aliases _KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py b/tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py index 7b82aa7c7c1..c1b80cf711d 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_test_client.py @@ -17,11 +17,11 @@ import signal from absl import app from absl import flags +from bin.lib import common from framework import xds_flags from framework import xds_k8s_flags from framework.infrastructure import gcp from framework.infrastructure import k8s -from framework.test_app.runners.k8s import k8s_xds_client_runner logger = logging.getLogger(__name__) # Flags @@ -36,10 +36,12 @@ _QPS = flags.DEFINE_integer('qps', default=25, help='Queries per second') _PRINT_RESPONSE = flags.DEFINE_bool("print_response", default=False, help="Client prints responses") -_FOLLOW = flags.DEFINE_bool("follow", - default=False, - help="Follow pod logs. " - "Requires --collect_app_logs") +_FOLLOW = flags.DEFINE_bool( + "follow", + default=False, + help= + "Follow pod logs. Requires --collect_app_logs or --debug_use_port_forwarding" +) _CONFIG_MESH = flags.DEFINE_bool( "config_mesh", default=None, @@ -54,13 +56,19 @@ _CLEANUP_NAMESPACE = flags.DEFINE_bool( flags.adopt_module_key_flags(xds_flags) flags.adopt_module_key_flags(xds_k8s_flags) # Running outside of a test suite, so require explicit resource_suffix. -flags.mark_flag_as_required("resource_suffix") +flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name) + -# Type aliases -_KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner +@flags.multi_flags_validator((xds_flags.SERVER_XDS_PORT.name, _CMD.name), + message="Run outside of a test suite, must provide" + " the exact port value (must be greater than 0).") +def _check_server_xds_port_flag(flags_dict): + if flags_dict[_CMD.name] == "cleanup": + return True + return flags_dict[xds_flags.SERVER_XDS_PORT.name] > 0 -def make_sigint_handler(client_runner: _KubernetesClientRunner): +def _make_sigint_handler(client_runner: common.KubernetesClientRunner): def sigint_handler(sig, frame): del sig, frame @@ -77,33 +85,21 @@ def main(argv): # Must be called before KubernetesApiManager or GcpApiManager init. xds_flags.set_socket_default_timeout_from_flag() - project: str = xds_flags.PROJECT.value - # GCP Service Account email - gcp_service_account: str = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value - - # KubernetesClientRunner arguments. - runner_kwargs = dict( - deployment_name=xds_flags.CLIENT_NAME.value, - image_name=xds_k8s_flags.CLIENT_IMAGE.value, - td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value, - gcp_project=project, - gcp_api_manager=gcp.api.GcpApiManager(), - gcp_service_account=gcp_service_account, - xds_server_uri=xds_flags.XDS_SERVER_URI.value, - network=xds_flags.NETWORK.value, - stats_port=xds_flags.CLIENT_PORT.value, - reuse_namespace=_REUSE_NAMESPACE.value) - - if _SECURE.value: - runner_kwargs.update( - deployment_template='client-secure.deployment.yaml') + # Log following and port forwarding. + should_follow_logs = _FOLLOW.value and xds_flags.COLLECT_APP_LOGS.value + should_port_forward = (should_follow_logs and + xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value) + # Setup. + gcp_api_manager = gcp.api.GcpApiManager() k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value) - client_namespace = _KubernetesClientRunner.make_namespace_name( - xds_flags.RESOURCE_PREFIX.value, xds_flags.RESOURCE_SUFFIX.value) - client_runner = _KubernetesClientRunner( - k8s.KubernetesNamespace(k8s_api_manager, client_namespace), - **runner_kwargs) + client_namespace = common.make_client_namespace(k8s_api_manager) + client_runner = common.make_client_runner( + client_namespace, + gcp_api_manager, + reuse_namespace=_REUSE_NAMESPACE.value, + secure=_SECURE.value, + port_forwarding=should_port_forward) # Server target server_xds_host = xds_flags.SERVER_XDS_HOST.value @@ -118,9 +114,9 @@ def main(argv): secure_mode=_SECURE.value, config_mesh=_CONFIG_MESH.value, log_to_stdout=_FOLLOW.value) - if client_runner.should_collect_logs and _FOLLOW.value: + if should_follow_logs: print('Following pod logs. Press Ctrl+C top stop') - signal.signal(signal.SIGINT, make_sigint_handler(client_runner)) + signal.signal(signal.SIGINT, _make_sigint_handler(client_runner)) signal.pause() elif _CMD.value == 'cleanup': diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py b/tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py index 14ba2aa5d02..1cc0d89f534 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py @@ -17,11 +17,11 @@ import signal from absl import app from absl import flags +from bin.lib import common from framework import xds_flags from framework import xds_k8s_flags from framework.infrastructure import gcp from framework.infrastructure import k8s -from framework.test_app.runners.k8s import k8s_xds_server_runner logger = logging.getLogger(__name__) # Flags @@ -51,11 +51,8 @@ flags.adopt_module_key_flags(xds_k8s_flags) # Running outside of a test suite, so require explicit resource_suffix. flags.mark_flag_as_required("resource_suffix") -# Type aliases -_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner - -def make_sigint_handler(server_runner: _KubernetesServerRunner): +def _make_sigint_handler(server_runner: common.KubernetesServerRunner): def sigint_handler(sig, frame): del sig, frame @@ -72,37 +69,21 @@ def main(argv): # Must be called before KubernetesApiManager or GcpApiManager init. xds_flags.set_socket_default_timeout_from_flag() - project: str = xds_flags.PROJECT.value - # GCP Service Account email - gcp_service_account: str = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value - - # Resource names. - resource_prefix: str = xds_flags.RESOURCE_PREFIX.value - resource_suffix: str = xds_flags.RESOURCE_SUFFIX.value - - # KubernetesServerRunner arguments. - runner_kwargs = dict( - deployment_name=xds_flags.SERVER_NAME.value, - image_name=xds_k8s_flags.SERVER_IMAGE.value, - td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value, - gcp_project=project, - gcp_api_manager=gcp.api.GcpApiManager(), - gcp_service_account=gcp_service_account, - network=xds_flags.NETWORK.value, - reuse_namespace=_REUSE_NAMESPACE.value, - reuse_service=_REUSE_SERVICE.value) - - if _SECURE.value: - runner_kwargs.update( - xds_server_uri=xds_flags.XDS_SERVER_URI.value, - deployment_template='server-secure.deployment.yaml') + should_follow_logs = _FOLLOW.value and xds_flags.COLLECT_APP_LOGS.value + should_port_forward = (should_follow_logs and + xds_k8s_flags.DEBUG_USE_PORT_FORWARDING.value) + # Setup. + gcp_api_manager = gcp.api.GcpApiManager() k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value) - server_namespace = _KubernetesServerRunner.make_namespace_name( - resource_prefix, resource_suffix) - server_runner = _KubernetesServerRunner( - k8s.KubernetesNamespace(k8s_api_manager, server_namespace), - **runner_kwargs) + server_namespace = common.make_server_namespace(k8s_api_manager) + server_runner = common.make_server_runner( + server_namespace, + gcp_api_manager, + reuse_namespace=_REUSE_NAMESPACE.value, + reuse_service=_REUSE_SERVICE.value, + secure=_SECURE.value, + port_forwarding=should_port_forward) if _CMD.value == 'run': logger.info('Run server, secure_mode=%s', _SECURE.value) @@ -111,9 +92,9 @@ def main(argv): maintenance_port=xds_flags.SERVER_MAINTENANCE_PORT.value, secure_mode=_SECURE.value, log_to_stdout=_FOLLOW.value) - if server_runner.should_collect_logs and _FOLLOW.value: + if should_follow_logs: print('Following pod logs. Press Ctrl+C top stop') - signal.signal(signal.SIGINT, make_sigint_handler(server_runner)) + signal.signal(signal.SIGINT, _make_sigint_handler(server_runner)) signal.pause() elif _CMD.value == 'cleanup': diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py index 0ee821590f6..c6850ae1460 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -149,6 +149,10 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): # Verify the deployment reports all pods started as well. self._wait_deployment_with_available_replicas(self.deployment_name) + return self._xds_test_client_for_pod(pod, server_target=server_target) + + def _xds_test_client_for_pod(self, pod: k8s.V1Pod, *, + server_target: str) -> XdsTestClient: if self.debug_use_port_forwarding: pf = self._start_port_forwarding_pod(pod, self.stats_port) rpc_port, rpc_host = pf.local_port, pf.local_address @@ -158,7 +162,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): return XdsTestClient(ip=pod.status.pod_ip, rpc_port=rpc_port, server_target=server_target, - hostname=pod_name, + hostname=pod.metadata.name, rpc_host=rpc_host) def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py index 4ab36ddaceb..11222b9dd4a 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -103,16 +103,13 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): secure_mode: bool = False, replica_count: int = 1, log_to_stdout: bool = False) -> List[XdsTestServer]: + if not maintenance_port: + maintenance_port = self._get_default_maintenance_port(secure_mode) + # Implementation detail: in secure mode, maintenance ("backchannel") # port must be different from the test port so communication with - # maintenance services can be reached independently from the security + # maintenance services can be reached independently of the security # configuration under test. - if maintenance_port is None: - if not secure_mode: - maintenance_port = self.DEFAULT_MAINTENANCE_PORT - else: - maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT - if secure_mode and maintenance_port == test_port: raise ValueError('port and maintenance_port must be different ' 'when running test server in secure mode') @@ -193,23 +190,44 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): # Verify the deployment reports all pods started as well. self._wait_deployment_with_available_replicas(self.deployment_name, replica_count) - servers = [] + servers: List[XdsTestServer] = [] for pod in pods: - if self.debug_use_port_forwarding: - pf = self._start_port_forwarding_pod(pod, maintenance_port) - rpc_port, rpc_host = pf.local_port, pf.local_address - else: - rpc_port, rpc_host = maintenance_port, None - servers.append( - XdsTestServer(ip=pod.status.pod_ip, - rpc_port=test_port, - hostname=pod.metadata.name, - maintenance_port=rpc_port, - secure_mode=secure_mode, - rpc_host=rpc_host)) + self._xds_test_server_for_pod(pod, + test_port=test_port, + maintenance_port=maintenance_port, + secure_mode=secure_mode)) return servers + def _get_default_maintenance_port(self, secure_mode: bool) -> int: + if not secure_mode: + maintenance_port = self.DEFAULT_MAINTENANCE_PORT + else: + maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT + return maintenance_port + + def _xds_test_server_for_pod(self, + pod: k8s.V1Pod, + *, + test_port: int = DEFAULT_TEST_PORT, + maintenance_port: Optional[int] = None, + secure_mode: bool = False) -> XdsTestServer: + if maintenance_port is None: + maintenance_port = self._get_default_maintenance_port(secure_mode) + + if self.debug_use_port_forwarding: + pf = self._start_port_forwarding_pod(pod, maintenance_port) + rpc_port, rpc_host = pf.local_port, pf.local_address + else: + rpc_port, rpc_host = maintenance_port, None + + return XdsTestServer(ip=pod.status.pod_ip, + rpc_port=test_port, + hostname=pod.metadata.name, + maintenance_port=rpc_port, + secure_mode=secure_mode, + rpc_host=rpc_host) + def cleanup(self, *, force=False, force_namespace=False): # pylint: disable=arguments-differ if self.deployment or force: self._delete_deployment(self.deployment_name)