diff --git a/tools/internal_ci/linux/grpc_xds_resource_cleanup.sh b/tools/internal_ci/linux/grpc_xds_resource_cleanup.sh index 22b660ccdf2..ad212f505ae 100644 --- a/tools/internal_ci/linux/grpc_xds_resource_cleanup.sh +++ b/tools/internal_ci/linux/grpc_xds_resource_cleanup.sh @@ -13,61 +13,136 @@ # See the License for the specific language governing permissions and # limitations under the License. -set -ex +set -eo pipefail -# consts +# Constants readonly GITHUB_REPOSITORY_NAME="grpc" readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/grpc/${TEST_DRIVER_BRANCH:-master}/tools/internal_ci/linux/grpc_xds_k8s_install_test_driver.sh" +# Keep orphaned resources last 2 days. +readonly KEEP_HOURS="${KEEP_HOURS:-48}" -cd "$(dirname "$0")/../../.." +cleanup::activate_cluster() { + activate_gke_cluster "$1" + gcloud container clusters get-credentials "${GKE_CLUSTER_NAME}" \ + --zone "${GKE_CLUSTER_ZONE}" + CLEANUP_KUBE_CONTEXT="$(kubectl config current-context)" +} -# Source the test driver from the master branch. -echo "Sourcing test driver install script from: ${TEST_DRIVER_INSTALL_SCRIPT_URL}" -source /dev/stdin <<< "$(curl -s "${TEST_DRIVER_INSTALL_SCRIPT_URL}")" -activate_gke_cluster GKE_CLUSTER_PSM_SECURITY -kokoro_setup_test_driver "${GITHUB_REPOSITORY_NAME}" +cleanup::activate_secondary_cluster_as_primary() { + activate_secondary_gke_cluster "$1" + GKE_CLUSTER_NAME="${SECONDARY_GKE_CLUSTER_NAME}" + GKE_CLUSTER_ZONE="${SECONDARY_GKE_CLUSTER_ZONE}" + gcloud container clusters get-credentials "${GKE_CLUSTER_NAME}" \ + --zone "${GKE_CLUSTER_ZONE}" + CLEANUP_KUBE_CONTEXT="$(kubectl config current-context)" +} -cd "${TEST_DRIVER_FULL_DIR}" +cleanup::job::cleanup_td() { + cleanup::run_clean "$1" --mode=td +} -# flag resource_prefix is required by the gke test framework, but doesn't -# matter for the cleanup script. -python3 -m bin.cleanup.cleanup \ - --project=grpc-testing \ - --network=default-vpc \ - --kube_context="${KUBE_CONTEXT}" \ - --gcp_service_account=xds-k8s-interop-tests@grpc-testing.iam.gserviceaccount.com \ - --resource_prefix='required-but-does-not-matter' \ - --td_bootstrap_image='required-but-does-not-matter' --server_image='required-but-does-not-matter' --client_image='required-but-does-not-matter' +####################################### +# The PSM_LB cluster is used by k8s_lb tests. +# The keep hours is reduced to 6. +####################################### +cleanup::job::cleanup_cluster_lb_primary() { + cleanup::activate_cluster GKE_CLUSTER_PSM_LB + cleanup::run_clean "$1" --mode=k8s +} -# The BASIC cluster is used by url-map tests. Only cleaning the GKE client -# namespaces, which won't provide much value in debugging. The keep hours is -# reduced to 6. -activate_gke_cluster GKE_CLUSTER_PSM_BASIC -# Invoking the get-crednetials directly, because the -# gcloud_get_cluster_credentials re-sets readonly Bash variables, which is nice -# safety mechanism to keep. -gcloud container clusters get-credentials "${GKE_CLUSTER_NAME}" --zone "${GKE_CLUSTER_ZONE}" -TARGET_KUBE_CONTEXT="$(kubectl config current-context)" -python3 -m bin.cleanup.namespace \ - --project=grpc-testing \ - --network=default-vpc \ - --keep_hours=6 \ - --kube_context="${TARGET_KUBE_CONTEXT}" \ - --gcp_service_account=xds-k8s-interop-tests@grpc-testing.iam.gserviceaccount.com \ - --resource_prefix='required-but-does-not-matter' \ - --td_bootstrap_image='required-but-does-not-matter' --server_image='required-but-does-not-matter' --client_image='required-but-does-not-matter' +####################################### +# Secondary PSM_LB cluster is used by k8s_lb tests. +# The keep hours is reduced to 6. +####################################### +cleanup::job::cleanup_cluster_lb_secondary() { + cleanup::activate_secondary_cluster_as_primary GKE_CLUSTER_PSM_LB + cleanup::run_clean "$1" --mode=k8s --secondary +} -# The PSM_LB cluster is used by k8s_lb tests. Only cleaning the GKE client -# namespaces, which won't provide much value in debugging. The keep hours is -# reduced to 6. -activate_gke_cluster GKE_CLUSTER_PSM_LB -gcloud container clusters get-credentials "${GKE_CLUSTER_NAME}" --zone "${GKE_CLUSTER_ZONE}" -TARGET_KUBE_CONTEXT="$(kubectl config current-context)" -python3 -m bin.cleanup.namespace \ - --project=grpc-testing \ - --network=default-vpc \ - --keep_hours=6 \ - --kube_context="${TARGET_KUBE_CONTEXT}" \ - --gcp_service_account=xds-k8s-interop-tests@grpc-testing.iam.gserviceaccount.com \ - --resource_prefix='required-but-does-not-matter' \ - --td_bootstrap_image='required-but-does-not-matter' --server_image='required-but-does-not-matter' --client_image='required-but-does-not-matter' +####################################### +# The BASIC cluster is used by url-map tests. Only cleaning the xds client +# namespaces; the xds server namespaces are shared. +# The keep hours is reduced to 6. +####################################### +cleanup::job::cleanup_cluster_url_map() { + cleanup::activate_cluster GKE_CLUSTER_PSM_BASIC + cleanup::run_clean "$1" --mode=k8s +} + +####################################### +# The SECURITY cluster is used by the security and authz test suites. +####################################### +cleanup::job::cleanup_cluster_security() { + cleanup::activate_cluster GKE_CLUSTER_PSM_SECURITY + cleanup::run_clean "$1" --mode=k8s +} + +####################################### +# Set common variables for the cleanup script. +# Globals: +# TEST_DRIVER_FLAGFILE: Relative path to test driver flagfile +# TEST_XML_OUTPUT_DIR: Output directory for the test xUnit XML report +# CLEANUP_KUBE_CONTEXT: The name of kubectl context with GKE cluster access. +# Arguments: +# Test job name. Currently only used to generate asset path, and uses +# values from the cleanup_jobs array of main(). +# TODO(sergiitk): turn job_name into action test methods of the cleanup. +# Outputs: +# Writes the output of test execution to stdout, stderr, +# ${TEST_XML_OUTPUT_DIR}/${job_name}/sponge_log.log +####################################### +cleanup::run_clean() { + local job_name="${1:?Usage: cleanup::run_clean job_name}" + local out_dir="${TEST_XML_OUTPUT_DIR}/${job_name}" + mkdir -pv "${out_dir}" + # TODO(sergiitk): make it a test, where job_name is a separate method. + python3 -m bin.cleanup.cleanup \ + --flagfile="${TEST_DRIVER_FLAGFILE}" \ + --kube_context="${CLEANUP_KUBE_CONTEXT:-unset}" \ + --keep_hours="${KEEP_HOURS}" \ + "${@:2}" \ + |& tee "${out_dir}/sponge_log.log" +} + +####################################### +# Main function: provision software necessary to execute the cleanup tasks; +# run them, and report the status. +####################################### +main() { + local script_dir + script_dir="$(dirname "$0")" + + # Source the test captured from the master branch. + echo "Sourcing test driver install captured from: ${TEST_DRIVER_INSTALL_SCRIPT_URL}" + source /dev/stdin <<< "$(curl -s "${TEST_DRIVER_INSTALL_SCRIPT_URL}")" + set +x + + # Valid cluster variables needed for the automatic driver setup. + activate_gke_cluster GKE_CLUSTER_PSM_BASIC + kokoro_setup_test_driver "${GITHUB_REPOSITORY_NAME}" + + # Run tests + cd "${TEST_DRIVER_FULL_DIR}" + local failed_jobs=0 + declare -a cleanup_jobs + cleanup_jobs=( + "cleanup_td" + "cleanup_cluster_lb_primary" + "cleanup_cluster_lb_secondary" + "cleanup_cluster_security" + "cleanup_cluster_url_map" + ) + for job_name in "${cleanup_jobs[@]}"; do + echo "-------------------- Starting job ${job_name} --------------------" + set -x + "cleanup::job::${job_name}" "${job_name}" || (( ++failed_jobs )) + set +x + echo "-------------------- Finished job ${job_name} --------------------" + done + echo "Failed job suites: ${failed_jobs}" + if (( failed_jobs > 0 )); then + exit 1 + fi +} + +main "$@" diff --git a/tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py b/tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py index 7a2581a4cea..a778610eee5 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py @@ -17,13 +17,12 @@ This is intended as a tool to delete leaked resources from old tests. Typical usage examples: -python3 tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py\ - --project=grpc-testing\ - --network=default-vpc\ +python3 -m bin.cleanup.cleanup \ + --project=grpc-testing \ + --network=default-vpc \ --kube_context=gke_grpc-testing_us-central1-a_psm-interop-security - --resource_prefix='required-but-does-not-matter'\ - --td_bootstrap_image='required-but-does-not-matter' --server_image='required-but-does-not-matter' --client_image='required-but-does-not-matter' """ +import dataclasses import datetime import functools import json @@ -31,7 +30,8 @@ import logging import os import re import subprocess -from typing import Any, List +import sys +from typing import Any, Callable, List, Optional from absl import app from absl import flags @@ -39,6 +39,7 @@ import dateutil from framework import xds_flags from framework import xds_k8s_flags +from framework.helpers import retryers from framework.infrastructure import gcp from framework.infrastructure import k8s from framework.infrastructure import traffic_director @@ -52,20 +53,31 @@ _KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner GCLOUD = os.environ.get("GCLOUD", "gcloud") GCLOUD_CMD_TIMEOUT_S = datetime.timedelta(seconds=5).total_seconds() -ZONE = "us-central1-a" -SECONDARY_ZONE = "us-west1-b" -PSM_SECURITY_PREFIX = "psm-interop" # Prefix for gke resources to delete. -URL_MAP_TEST_PREFIX = ( # Prefix for url-map test resources to delete. - "interop-psm-url-map" +# Skip known k8s system namespaces. +K8S_PROTECTED_NAMESPACES = { + "default", + "gke-managed-system", + "kube-node-lease", + "kube-public", + "kube-system", +} + +# TODO(sergiitk): these should be flags. +LEGACY_DRIVER_ZONE = "us-central1-a" +LEGACY_DRIVER_SECONDARY_ZONE = "us-west1-b" + +PSM_INTEROP_PREFIX = "psm-interop" # Prefix for gke resources to delete. +URL_MAP_TEST_PREFIX = ( + "interop-psm-url-map" # Prefix for url-map test resources to delete. ) KEEP_PERIOD_HOURS = flags.DEFINE_integer( "keep_hours", - default=168, + default=48, help=( "number of hours for a resource to keep. Resources older than this will" - " be deleted. Default is 168 (7 days)" + " be deleted. Default is 48 hours (2 days)" ), ) DRY_RUN = flags.DEFINE_bool( @@ -75,7 +87,7 @@ DRY_RUN = flags.DEFINE_bool( ) TD_RESOURCE_PREFIXES = flags.DEFINE_list( "td_resource_prefixes", - default=[PSM_SECURITY_PREFIX], + default=[PSM_INTEROP_PREFIX], help=( "a comma-separated list of prefixes for which the leaked TD resources" " will be deleted" @@ -83,7 +95,7 @@ TD_RESOURCE_PREFIXES = flags.DEFINE_list( ) SERVER_PREFIXES = flags.DEFINE_list( "server_prefixes", - default=[PSM_SECURITY_PREFIX], + default=[PSM_INTEROP_PREFIX], help=( "a comma-separated list of prefixes for which the leaked servers will" " be deleted" @@ -91,12 +103,55 @@ SERVER_PREFIXES = flags.DEFINE_list( ) CLIENT_PREFIXES = flags.DEFINE_list( "client_prefixes", - default=[PSM_SECURITY_PREFIX, URL_MAP_TEST_PREFIX], + default=[PSM_INTEROP_PREFIX, URL_MAP_TEST_PREFIX], help=( "a comma-separated list of prefixes for which the leaked clients will" " be deleted" ), ) +MODE = flags.DEFINE_enum( + "mode", + default="td", + enum_values=["k8s", "td", "td_no_legacy"], + help="Mode: Kubernetes or Traffic Director", +) +SECONDARY = flags.DEFINE_bool( + "secondary", + default=False, + help="Cleanup secondary (alternative) resources", +) + +# The cleanup script performs some API calls directly, so some flags normally +# required to configure framework properly, are not needed here. +flags.FLAGS.set_default("resource_prefix", "ignored-by-cleanup") +flags.FLAGS.set_default("td_bootstrap_image", "ignored-by-cleanup") +flags.FLAGS.set_default("server_image", "ignored-by-cleanup") +flags.FLAGS.set_default("client_image", "ignored-by-cleanup") + + +@dataclasses.dataclass(eq=False) +class CleanupResult: + error_count: int = 0 + error_messages: List[str] = dataclasses.field(default_factory=list) + + def add_error(self, msg: str): + self.error_count += 1 + self.error_messages.append(f" {self.error_count}. {msg}") + + def format_messages(self): + return "\n".join(self.error_messages) + + +@dataclasses.dataclass(frozen=True) +class K8sResourceRule: + # regex to match + expression: str + # function to delete the resource + cleanup_ns_fn: Callable + + +# Global state, holding the result of the whole operation. +_CLEANUP_RESULT = CleanupResult() def load_keep_config() -> None: @@ -129,7 +184,7 @@ def get_expire_timestamp() -> datetime.datetime: ) -def exec_gcloud(project: str, *cmds: List[str]) -> Json: +def exec_gcloud(project: str, *cmds: str) -> Json: cmds = [GCLOUD, "--project", project, "--quiet"] + list(cmds) if "list" in cmds: # Add arguments to shape the list output @@ -167,10 +222,10 @@ def exec_gcloud(project: str, *cmds: List[str]) -> Json: return None -def remove_relative_resources_run_xds_tests( - project: str, network: str, prefix: str, suffix: str -): +def cleanup_legacy_driver_resources(*, project: str, suffix: str, **kwargs): """Removing GCP resources created by run_xds_tests.py.""" + # Unused, but kept for compatibility with cleanup_td_for_gke. + del kwargs logging.info( "----- Removing run_xds_tests.py resources with suffix [%s]", suffix ) @@ -244,7 +299,7 @@ def remove_relative_resources_run_xds_tests( "delete", f"test-ig{suffix}", "--zone", - ZONE, + LEGACY_DRIVER_ZONE, ) exec_gcloud( project, @@ -254,7 +309,7 @@ def remove_relative_resources_run_xds_tests( "delete", f"test-ig-same-zone{suffix}", "--zone", - ZONE, + LEGACY_DRIVER_ZONE, ) exec_gcloud( project, @@ -264,7 +319,7 @@ def remove_relative_resources_run_xds_tests( "delete", f"test-ig-secondary-zone{suffix}", "--zone", - SECONDARY_ZONE, + LEGACY_DRIVER_SECONDARY_ZONE, ) exec_gcloud( project, @@ -281,21 +336,21 @@ def remove_relative_resources_run_xds_tests( # Note that the varients are all based on the basic TrafficDirectorManager, so # their `cleanup()` might do duplicate work. But deleting an non-exist resource # returns 404, and is OK. -def cleanup_td_for_gke(project, network, resource_prefix, resource_suffix): +def cleanup_td_for_gke(*, project, prefix, suffix, network): gcp_api_manager = gcp.api.GcpApiManager() plain_td = traffic_director.TrafficDirectorManager( gcp_api_manager, project=project, network=network, - resource_prefix=resource_prefix, - resource_suffix=resource_suffix, + resource_prefix=prefix, + resource_suffix=suffix, ) security_td = traffic_director.TrafficDirectorSecureManager( gcp_api_manager, project=project, network=network, - resource_prefix=resource_prefix, - resource_suffix=resource_suffix, + resource_prefix=prefix, + resource_suffix=suffix, ) # TODO: cleanup appnet resources. # appnet_td = traffic_director.TrafficDirectorAppNetManager( @@ -307,8 +362,8 @@ def cleanup_td_for_gke(project, network, resource_prefix, resource_suffix): logger.info( "----- Removing traffic director for gke, prefix %s, suffix %s", - resource_prefix, - resource_suffix, + prefix, + suffix, ) security_td.cleanup(force=True) # appnet_td.cleanup(force=True) @@ -320,32 +375,42 @@ def cleanup_client( project, network, k8s_api_manager, - resource_prefix, - resource_suffix, + client_namespace, + gcp_api_manager, gcp_service_account, + *, + suffix: Optional[str] = "", ): - runner_kwargs = dict( - deployment_name=xds_flags.CLIENT_NAME.value, - image_name=xds_k8s_flags.CLIENT_IMAGE.value, - td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value, + deployment_name = xds_flags.CLIENT_NAME.value + if suffix: + deployment_name = f"{deployment_name}-{suffix}" + + ns = k8s.KubernetesNamespace(k8s_api_manager, client_namespace) + # Shorten the timeout to avoid waiting for the stuck namespaces. + # Normal ns deletion during the cleanup takes less two minutes. + ns.wait_for_namespace_deleted_timeout_sec = 5 * 60 + client_runner = _KubernetesClientRunner( + k8s_namespace=ns, + deployment_name=deployment_name, gcp_project=project, - gcp_api_manager=gcp.api.GcpApiManager(), - gcp_service_account=gcp_service_account, - xds_server_uri=xds_flags.XDS_SERVER_URI.value, network=network, - stats_port=xds_flags.CLIENT_PORT.value, - ) - - client_namespace = _KubernetesClientRunner.make_namespace_name( - resource_prefix, resource_suffix - ) - client_runner = _KubernetesClientRunner( - k8s.KubernetesNamespace(k8s_api_manager, client_namespace), - **runner_kwargs, + gcp_service_account=gcp_service_account, + gcp_api_manager=gcp_api_manager, + image_name="", + td_bootstrap_image="", ) logger.info("Cleanup client") - client_runner.cleanup(force=True, force_namespace=True) + try: + client_runner.cleanup(force=True, force_namespace=True) + except retryers.RetryError as err: + logger.error( + "Timeout waiting for namespace %s deletion. " + "Failed resource status:\n\n%s", + ns.name, + ns.pretty_format_status(err.result()), + ) + raise # cleanup_server creates a server runner, and calls its cleanup() method. @@ -353,30 +418,42 @@ def cleanup_server( project, network, k8s_api_manager, - resource_prefix, - resource_suffix, + server_namespace, + gcp_api_manager, gcp_service_account, + *, + suffix: Optional[str] = "", ): - runner_kwargs = dict( - deployment_name=xds_flags.SERVER_NAME.value, - image_name=xds_k8s_flags.SERVER_IMAGE.value, - td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value, + deployment_name = xds_flags.SERVER_NAME.value + if suffix: + deployment_name = f"{deployment_name}-{suffix}" + + ns = k8s.KubernetesNamespace(k8s_api_manager, server_namespace) + # Shorten the timeout to avoid waiting for the stuck namespaces. + # Normal ns deletion during the cleanup takes less two minutes. + ns.wait_for_namespace_deleted_timeout_sec = 5 * 60 + server_runner = _KubernetesServerRunner( + k8s_namespace=ns, + deployment_name=deployment_name, gcp_project=project, - gcp_api_manager=gcp.api.GcpApiManager(), - gcp_service_account=gcp_service_account, network=network, - ) - - server_namespace = _KubernetesServerRunner.make_namespace_name( - resource_prefix, resource_suffix - ) - server_runner = _KubernetesServerRunner( - k8s.KubernetesNamespace(k8s_api_manager, server_namespace), - **runner_kwargs, + gcp_service_account=gcp_service_account, + gcp_api_manager=gcp_api_manager, + image_name="", + td_bootstrap_image="", ) logger.info("Cleanup server") - server_runner.cleanup(force=True, force_namespace=True) + try: + server_runner.cleanup(force=True, force_namespace=True) + except retryers.RetryError as err: + logger.error( + "Timeout waiting for namespace %s deletion. " + "Failed resource status:\n\n%s", + ns.name, + ns.pretty_format_status(err.result()), + ) + raise def delete_leaked_td_resources( @@ -390,14 +467,19 @@ def delete_leaked_td_resources( logging.info("----- Skipped [Dry Run]: %s", resource["name"]) continue matched = False - for regex, resource_prefix, keep, remove in td_resource_rules: + for regex, resource_prefix, keep, remove_fn in td_resource_rules: result = re.search(regex, resource["name"]) if result is not None: matched = True if keep(result.group(1)): logging.info("Skipped [keep]:") break # break inner loop, continue outer loop - remove(project, network, resource_prefix, result.group(1)) + remove_fn( + project=project, + prefix=resource_prefix, + suffix=result.group(1), + network=network, + ) break if not matched: logging.info( @@ -414,58 +496,97 @@ def delete_k8s_resources( gcp_service_account, namespaces, ): + gcp_api_manager = gcp.api.GcpApiManager() for ns in namespaces: + namespace_name: str = ns.metadata.name + if namespace_name in K8S_PROTECTED_NAMESPACES: + continue + logger.info("-----") - logger.info("----- Cleaning up k8s namespaces %s", ns.metadata.name) - if ns.metadata.creation_timestamp <= get_expire_timestamp(): - if dry_run: - # Skip deletion for dry-runs - logging.info("----- Skipped [Dry Run]: %s", ns.metadata.name) - continue - - matched = False - for regex, resource_prefix, remove in k8s_resource_rules: - result = re.search(regex, ns.metadata.name) - if result is not None: - matched = True - remove( - project, - network, - k8s_api_manager, - resource_prefix, - result.group(1), - gcp_service_account, - ) - break - if not matched: - logging.info( - "----- Skipped [does not matching resource name templates]" - ) - else: - logging.info("----- Skipped [resource is within expiry date]") + logger.info("----- Cleaning up k8s namespaces %s", namespace_name) + + if ns.metadata.creation_timestamp > get_expire_timestamp(): + logging.info( + "----- Skipped [resource is within expiry date]: %s", + namespace_name, + ) + continue + + if dry_run: + # Skip deletion for dry-runs + logging.info("----- Skipped [Dry Run]: %s", ns.metadata.name) + continue + + rule: K8sResourceRule = _rule_match_k8s_namespace( + namespace_name, k8s_resource_rules + ) + if not rule: + logging.info( + "----- Skipped [does not matching resource name templates]: %s", + namespace_name, + ) + continue + + # Cleaning up. + try: + rule.cleanup_ns_fn( + project, + network, + k8s_api_manager, + namespace_name, + gcp_api_manager, + gcp_service_account, + suffix=("alt" if SECONDARY.value else None), + ) + except k8s.NotFound: + logging.warning("----- Skipped [not found]: %s", namespace_name) + except retryers.RetryError as err: + _CLEANUP_RESULT.add_error( + "Retries exhausted while waiting for the " + f"deletion of namespace {namespace_name}: " + f"{err}" + ) + logging.exception( + "----- Skipped [cleanup timed out]: %s", namespace_name + ) + except Exception as err: # noqa pylint: disable=broad-except + _CLEANUP_RESULT.add_error( + "Unexpected error while deleting " + f"namespace {namespace_name}: {err}" + ) + logging.exception( + "----- Skipped [cleanup unexpected error]: %s", namespace_name + ) + + logger.info("-----") + + +def _rule_match_k8s_namespace( + namespace_name: str, k8s_resource_rules: List[K8sResourceRule] +) -> Optional[K8sResourceRule]: + for rule in k8s_resource_rules: + result = re.search(rule.expression, namespace_name) + if result is not None: + return rule + return None def find_and_remove_leaked_k8s_resources( - dry_run, project, network, gcp_service_account + dry_run, project, network, gcp_service_account, k8s_context ): - k8s_resource_rules = [ - # items in each tuple, in order - # - regex to match - # - prefix of the resources - # - function to delete the resource - ] + k8s_resource_rules: List[K8sResourceRule] = [] for prefix in CLIENT_PREFIXES.value: k8s_resource_rules.append( - (f"{prefix}-client-(.*)", prefix, cleanup_client), + K8sResourceRule(f"{prefix}-client-(.*)", cleanup_client) ) for prefix in SERVER_PREFIXES.value: k8s_resource_rules.append( - (f"{prefix}-server-(.*)", prefix, cleanup_server), + K8sResourceRule(f"{prefix}-server-(.*)", cleanup_server) ) # Delete leaked k8s namespaces, those usually mean there are leaked testing # client/servers from the gke framework. - k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value) + k8s_api_manager = k8s.KubernetesApiManager(k8s_context) nss = k8s_api_manager.core.list_namespace() delete_k8s_resources( dry_run, @@ -478,38 +599,32 @@ def find_and_remove_leaked_k8s_resources( ) -def main(argv): - if len(argv) > 1: - raise app.UsageError("Too many command-line arguments.") - load_keep_config() - - # Must be called before KubernetesApiManager or GcpApiManager init. - xds_flags.set_socket_default_timeout_from_flag() - - project: str = xds_flags.PROJECT.value - network: str = xds_flags.NETWORK.value - gcp_service_account: str = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value - dry_run: bool = DRY_RUN.value - +def find_and_remove_leaked_td_resources(dry_run, project, network): + cleanup_legacy: bool = MODE.value != "td_no_legacy" td_resource_rules = [ # itmes in each tuple, in order # - regex to match # - prefix of the resource (only used by gke resources) # - function to check of the resource should be kept # - function to delete the resource - ( - r"test-hc(.*)", - "", - is_marked_as_keep_gce, - remove_relative_resources_run_xds_tests, - ), - ( - r"test-template(.*)", - "", - is_marked_as_keep_gce, - remove_relative_resources_run_xds_tests, - ), ] + + if cleanup_legacy: + td_resource_rules += [ + ( + r"test-hc(.*)", + "", + is_marked_as_keep_gce, + cleanup_legacy_driver_resources, + ), + ( + r"test-template(.*)", + "", + is_marked_as_keep_gce, + cleanup_legacy_driver_resources, + ), + ] + for prefix in TD_RESOURCE_PREFIXES.value: td_resource_rules.append( ( @@ -521,7 +636,8 @@ def main(argv): ) # List resources older than KEEP_PERIOD. We only list health-checks and - # instance templates because these are leaves in the resource dependency tree. + # instance templates because these are leaves in the resource dependency + # tree. # # E.g. forwarding-rule depends on the target-proxy. So leaked # forwarding-rule indicates there's a leaked target-proxy (because this @@ -529,31 +645,69 @@ def main(argv): # leaked target-proxy is guaranteed to be a super set of leaked # forwarding-rule. compute = gcp.compute.ComputeV1(gcp.api.GcpApiManager(), project) - leakedHealthChecks = [] + leaked_health_checks = [] for item in compute.list_health_check()["items"]: if ( dateutil.parser.isoparse(item["creationTimestamp"]) <= get_expire_timestamp() ): - leakedHealthChecks.append(item) + leaked_health_checks.append(item) delete_leaked_td_resources( - dry_run, td_resource_rules, project, network, leakedHealthChecks + dry_run, td_resource_rules, project, network, leaked_health_checks ) # Delete leaked instance templates, those usually mean there are leaked VMs # from the gce framework. Also note that this is only needed for the gce # resources. - leakedInstanceTemplates = exec_gcloud( - project, "compute", "instance-templates", "list" - ) - delete_leaked_td_resources( - dry_run, td_resource_rules, project, network, leakedInstanceTemplates - ) + if cleanup_legacy: + leaked_instance_templates = exec_gcloud( + project, "compute", "instance-templates", "list" + ) + delete_leaked_td_resources( + dry_run, + td_resource_rules, + project, + network, + leaked_instance_templates, + ) - find_and_remove_leaked_k8s_resources( - dry_run, project, network, gcp_service_account - ) + +def main(argv): + # TODO(sergiitk): instead, base on absltest so that result.xml is available. + if len(argv) > 1: + raise app.UsageError("Too many command-line arguments.") + load_keep_config() + + # Must be called before KubernetesApiManager or GcpApiManager init. + xds_flags.set_socket_default_timeout_from_flag() + + project: str = xds_flags.PROJECT.value + network: str = xds_flags.NETWORK.value + gcp_service_account: str = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value + dry_run: bool = DRY_RUN.value + k8s_context: str = xds_k8s_flags.KUBE_CONTEXT.value + + if MODE.value == "td" or MODE.value == "td_no_legacy": + find_and_remove_leaked_td_resources(dry_run, project, network) + elif MODE.value == "k8s": + # 'unset' value is used in td-only mode to bypass the validation + # for the required flag. + assert k8s_context != "unset" + find_and_remove_leaked_k8s_resources( + dry_run, project, network, gcp_service_account, k8s_context + ) + + logger.info("##################### Done cleaning up #####################") + if _CLEANUP_RESULT.error_count > 0: + logger.error( + "Cleanup failed for %i resource(s). Errors: [\n%s\n].\n" + "Please inspect the log files for stack traces corresponding " + "to these errors.", + _CLEANUP_RESULT.error_count, + _CLEANUP_RESULT.format_messages(), + ) + sys.exit(1) if __name__ == "__main__": diff --git a/tools/run_tests/xds_k8s_test_driver/bin/cleanup/namespace.py b/tools/run_tests/xds_k8s_test_driver/bin/cleanup/namespace.py deleted file mode 100644 index 27c1aee92c9..00000000000 --- a/tools/run_tests/xds_k8s_test_driver/bin/cleanup/namespace.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2022 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Clean up GKE namespaces leaked by the tests.""" - -from absl import app - -from bin.cleanup import cleanup -from framework import xds_flags -from framework import xds_k8s_flags - - -def main(argv): - if len(argv) > 1: - raise app.UsageError("Too many command-line arguments.") - cleanup.load_keep_config() - - # Must be called before KubernetesApiManager or GcpApiManager init. - xds_flags.set_socket_default_timeout_from_flag() - - project: str = xds_flags.PROJECT.value - network: str = xds_flags.NETWORK.value - gcp_service_account: str = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value - dry_run: bool = cleanup.DRY_RUN.value - - cleanup.find_and_remove_leaked_k8s_resources( - dry_run, project, network, gcp_service_account - ) - - -if __name__ == "__main__": - app.run(main) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/helpers/datetime.py b/tools/run_tests/xds_k8s_test_driver/framework/helpers/datetime.py index 08d06a1349f..27236d0fc06 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/helpers/datetime.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/helpers/datetime.py @@ -14,7 +14,9 @@ """This contains common helpers for working with dates and time.""" import datetime import re -from typing import Pattern +from typing import Optional, Pattern + +import dateutil.parser RE_ZERO_OFFSET: Pattern[str] = re.compile(r"[+\-]00:?00$") @@ -35,6 +37,11 @@ def iso8601_utc_time(time: datetime.datetime = None) -> str: return shorten_utc_zone(utc_time.isoformat()) +def iso8601_to_datetime(date_str: str) -> datetime.datetime: + # TODO(sergiitk): use regular datetime.datetime when upgraded to py3.11. + return dateutil.parser.isoparse(date_str) + + def datetime_suffix(*, seconds: bool = False) -> str: """Return current UTC date, and time in a format useful for resource naming. @@ -48,3 +55,25 @@ def datetime_suffix(*, seconds: bool = False) -> str: visually distinct from dash-separated date. """ return utc_now().strftime("%Y%m%d-%H%M" + ("%S" if seconds else "")) + + +def ago(date_from: datetime.datetime, now: Optional[datetime.datetime] = None): + if not now: + now = utc_now() + + # Round down microseconds. + date_from = date_from.replace(microsecond=0) + now = now.replace(microsecond=0) + + # Calculate the diff. + delta: datetime.timedelta = now - date_from + + if delta.days > 1: + result = f"{delta.days} days" + elif delta.days > 0: + result = f"{delta.days} day" + else: + # This case covers negative deltas too. + result = f"{delta} (h:mm:ss)" + + return f"{result} ago" diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py index 1056de64eae..3afcd007787 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py @@ -34,6 +34,7 @@ import yaml import framework.errors from framework.helpers import retryers +import framework.helpers.datetime import framework.helpers.highlighter from framework.infrastructure.k8s_internal import k8s_log_collector from framework.infrastructure.k8s_internal import k8s_port_forwarder @@ -41,7 +42,6 @@ from framework.infrastructure.k8s_internal import k8s_port_forwarder logger = logging.getLogger(__name__) # Type aliases -_HighlighterYaml = framework.helpers.highlighter.HighlighterYaml PodLogCollector = k8s_log_collector.PodLogCollector PortForwarder = k8s_port_forwarder.PortForwarder V1Deployment = client.V1Deployment @@ -50,6 +50,7 @@ V1Pod = client.V1Pod V1PodList = client.V1PodList V1Service = client.V1Service V1Namespace = client.V1Namespace +V1ObjectMeta = client.V1ObjectMeta DynResourceInstance = dynamic_res.ResourceInstance GammaMesh = DynResourceInstance @@ -60,6 +61,9 @@ GcpSessionAffinityFilter = DynResourceInstance GcpBackendPolicy = DynResourceInstance _timedelta = datetime.timedelta +_datetime = datetime.datetime +_helper_datetime = framework.helpers.datetime +_HighlighterYaml = framework.helpers.highlighter.HighlighterYaml _ApiException = client.ApiException _FailToCreateError = utils.FailToCreateError @@ -265,6 +269,10 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods WAIT_LONG_SLEEP_SEC: int = 30 WAIT_POD_START_TIMEOUT_SEC: int = 3 * 60 + # TODO(sergiitk): Find a better way. Maybe like in framework.rpc.grpc? + wait_for_namespace_deleted_timeout_sec = None + wait_for_namespace_deleted_sleep_sec = None + def __init__(self, api: KubernetesApiManager, name: str): self._api = api self._name = name @@ -761,9 +769,20 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods def wait_for_namespace_deleted( self, - timeout_sec: int = WAIT_LONG_TIMEOUT_SEC, - wait_sec: int = WAIT_LONG_SLEEP_SEC, + timeout_sec: Optional[int] = None, + wait_sec: Optional[int] = None, ) -> None: + if timeout_sec is None: + if self.wait_for_namespace_deleted_timeout_sec is not None: + timeout_sec = self.wait_for_namespace_deleted_timeout_sec + else: + timeout_sec = self.WAIT_LONG_TIMEOUT_SEC + if wait_sec is None: + if self.wait_for_namespace_deleted_sleep_sec is not None: + wait_sec = self.wait_for_namespace_deleted_timeout_sec + else: + wait_sec = self.WAIT_LONG_SLEEP_SEC + retryer = retryers.constant_retryer( wait_fixed=_timedelta(seconds=wait_sec), timeout=_timedelta(seconds=timeout_sec), @@ -797,9 +816,9 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods f"\nThis indicates the NEG wasn't created OR" f" the NEG creation event hasn't propagated to Kubernetes." f" Service metadata:\n" - f"{self._pretty_format_metadata(result, highlight=False)}" + f"{self.pretty_format_metadata(result, highlight=False)}" f"Service status:\n" - f"{self._pretty_format_status(result, highlight=False)}" + f"{self.pretty_format_status(result, highlight=False)}" ), ) retry_err.add_note(note) @@ -861,7 +880,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods info_below=( f"Timeout {timeout} (h:mm:ss) waiting for deployment {name}" f" to report {count} replicas available. Last status:\n" - f"{self._pretty_format_status(result, highlight=False)}" + f"{self.pretty_format_status(result, highlight=False)}" ), ) retry_err.add_note(note) @@ -890,7 +909,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods info_below=( f"Timeout {timeout} (h:mm:ss) waiting for pod count" f" {count}, got: {len(result)}. Pod statuses:\n" - f"{self._pretty_format_statuses(result, highlight=False)}" + f"{self.pretty_format_status(result, highlight=False)}" ), ) retry_err.add_note(note) @@ -944,7 +963,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods info_below=( f"Timeout {timeout} (h:mm:ss) waiting for pod" f" {pod_name} to start. Pod status:\n" - f"{self._pretty_format_status(result, highlight=False)}" + f"{self.pretty_format_status(result, highlight=False)}" ), ) ) @@ -989,35 +1008,53 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods pod_log_collector.start() return pod_log_collector - def _pretty_format_statuses( + def pretty_format_statuses( self, k8s_objects: List[Optional[object]], *, highlight: bool = True, ) -> str: return "\n".join( - self._pretty_format_status(k8s_object, highlight=highlight) + self.pretty_format_status(k8s_object, highlight=highlight) for k8s_object in k8s_objects ) - def _pretty_format_status( + def pretty_format_status( self, k8s_object: Optional[object], - *, highlight: bool = True, ) -> str: if k8s_object is None: return "No data" - # Parse the name if present. - if hasattr(k8s_object, "metadata") and hasattr( - k8s_object.metadata, "name" - ): - name = k8s_object.metadata.name - else: - name = "Can't parse resource name" + result = [] + metadata: Optional[V1ObjectMeta] = None + if isinstance(getattr(k8s_object, "metadata", None), V1ObjectMeta): + # Parse the name if present. + metadata: V1ObjectMeta = k8s_object.metadata + + # Parse name if, present, but always indicate unsuccessful parse. + name = metadata.name if metadata else "Can't parse resource name" + result.append(f"Resource name: {name}") + + # Add kubernetes kind (resource type) if present. + if hasattr(k8s_object, "kind"): + result.append(f"Resource kind: {k8s_object.kind}") + + # Add the timestamps if present. + if metadata and metadata.creation_timestamp: + result.append( + f"Created: {metadata.creation_timestamp};" + f" {_helper_datetime.ago(metadata.creation_timestamp)}" + ) + if metadata and metadata.deletion_timestamp: + result.append( + f"Deletion requested: {metadata.deletion_timestamp};" + f" {_helper_datetime.ago(metadata.deletion_timestamp)}" + ) # Pretty-print the status if present. + result.append("") if hasattr(k8s_object, "status"): try: status = self._pretty_format( @@ -1030,11 +1067,11 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods status = f"Can't parse resource status: {e}" else: status = "Can't parse resource status" + result.append(status) - # Return the name of k8s object, and its pretty-printed status. - return f"{name}:\n{status}\n" + return "\n".join(result) + "\n" - def _pretty_format_metadata( + def pretty_format_metadata( self, k8s_object: Optional[object], *,