[PSM Interop] New cleanup script (#33460)

1. Changes the resource retention period to 2 days for all resources
(previously 7 days for TD resources, 6 hours for k8s). This solved a
problem with k8s resources being stuck because corresponding TD
resources weren't deleted.
2. Resume on namespace cleanup failures
3. Add secondary lb cluster cleanup logic
4. Modularize `grpc_xds_resource_cleanup.sh`
5. Make `KubernetesNamespace`'s methods `pretty_format_status` and
`pretty_format_metadata` public
6. `pretty_format_status`: also print resource kind, creation and
deletion requested dates

ref b/259724370, cl/517235715
pull/34708/head
Sergii Tkachenko 1 year ago committed by GitHub
parent 997c73a6a4
commit 1c4da38d40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 173
      tools/internal_ci/linux/grpc_xds_resource_cleanup.sh
  2. 382
      tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py
  3. 42
      tools/run_tests/xds_k8s_test_driver/bin/cleanup/namespace.py
  4. 31
      tools/run_tests/xds_k8s_test_driver/framework/helpers/datetime.py
  5. 79
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py

@ -13,61 +13,136 @@
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
set -eo pipefail
# consts
# Constants
readonly GITHUB_REPOSITORY_NAME="grpc"
readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/grpc/${TEST_DRIVER_BRANCH:-master}/tools/internal_ci/linux/grpc_xds_k8s_install_test_driver.sh"
# Keep orphaned resources last 2 days.
readonly KEEP_HOURS="${KEEP_HOURS:-48}"
cd "$(dirname "$0")/../../.."
cleanup::activate_cluster() {
activate_gke_cluster "$1"
gcloud container clusters get-credentials "${GKE_CLUSTER_NAME}" \
--zone "${GKE_CLUSTER_ZONE}"
CLEANUP_KUBE_CONTEXT="$(kubectl config current-context)"
}
# Source the test driver from the master branch.
echo "Sourcing test driver install script from: ${TEST_DRIVER_INSTALL_SCRIPT_URL}"
source /dev/stdin <<< "$(curl -s "${TEST_DRIVER_INSTALL_SCRIPT_URL}")"
activate_gke_cluster GKE_CLUSTER_PSM_SECURITY
kokoro_setup_test_driver "${GITHUB_REPOSITORY_NAME}"
cleanup::activate_secondary_cluster_as_primary() {
activate_secondary_gke_cluster "$1"
GKE_CLUSTER_NAME="${SECONDARY_GKE_CLUSTER_NAME}"
GKE_CLUSTER_ZONE="${SECONDARY_GKE_CLUSTER_ZONE}"
gcloud container clusters get-credentials "${GKE_CLUSTER_NAME}" \
--zone "${GKE_CLUSTER_ZONE}"
CLEANUP_KUBE_CONTEXT="$(kubectl config current-context)"
}
cd "${TEST_DRIVER_FULL_DIR}"
cleanup::job::cleanup_td() {
cleanup::run_clean "$1" --mode=td
}
# flag resource_prefix is required by the gke test framework, but doesn't
# matter for the cleanup script.
python3 -m bin.cleanup.cleanup \
--project=grpc-testing \
--network=default-vpc \
--kube_context="${KUBE_CONTEXT}" \
--gcp_service_account=xds-k8s-interop-tests@grpc-testing.iam.gserviceaccount.com \
--resource_prefix='required-but-does-not-matter' \
--td_bootstrap_image='required-but-does-not-matter' --server_image='required-but-does-not-matter' --client_image='required-but-does-not-matter'
#######################################
# The PSM_LB cluster is used by k8s_lb tests.
# The keep hours is reduced to 6.
#######################################
cleanup::job::cleanup_cluster_lb_primary() {
cleanup::activate_cluster GKE_CLUSTER_PSM_LB
cleanup::run_clean "$1" --mode=k8s
}
# The BASIC cluster is used by url-map tests. Only cleaning the GKE client
# namespaces, which won't provide much value in debugging. The keep hours is
# reduced to 6.
activate_gke_cluster GKE_CLUSTER_PSM_BASIC
# Invoking the get-crednetials directly, because the
# gcloud_get_cluster_credentials re-sets readonly Bash variables, which is nice
# safety mechanism to keep.
gcloud container clusters get-credentials "${GKE_CLUSTER_NAME}" --zone "${GKE_CLUSTER_ZONE}"
TARGET_KUBE_CONTEXT="$(kubectl config current-context)"
python3 -m bin.cleanup.namespace \
--project=grpc-testing \
--network=default-vpc \
--keep_hours=6 \
--kube_context="${TARGET_KUBE_CONTEXT}" \
--gcp_service_account=xds-k8s-interop-tests@grpc-testing.iam.gserviceaccount.com \
--resource_prefix='required-but-does-not-matter' \
--td_bootstrap_image='required-but-does-not-matter' --server_image='required-but-does-not-matter' --client_image='required-but-does-not-matter'
#######################################
# Secondary PSM_LB cluster is used by k8s_lb tests.
# The keep hours is reduced to 6.
#######################################
cleanup::job::cleanup_cluster_lb_secondary() {
cleanup::activate_secondary_cluster_as_primary GKE_CLUSTER_PSM_LB
cleanup::run_clean "$1" --mode=k8s --secondary
}
# The PSM_LB cluster is used by k8s_lb tests. Only cleaning the GKE client
# namespaces, which won't provide much value in debugging. The keep hours is
# reduced to 6.
activate_gke_cluster GKE_CLUSTER_PSM_LB
gcloud container clusters get-credentials "${GKE_CLUSTER_NAME}" --zone "${GKE_CLUSTER_ZONE}"
TARGET_KUBE_CONTEXT="$(kubectl config current-context)"
python3 -m bin.cleanup.namespace \
--project=grpc-testing \
--network=default-vpc \
--keep_hours=6 \
--kube_context="${TARGET_KUBE_CONTEXT}" \
--gcp_service_account=xds-k8s-interop-tests@grpc-testing.iam.gserviceaccount.com \
--resource_prefix='required-but-does-not-matter' \
--td_bootstrap_image='required-but-does-not-matter' --server_image='required-but-does-not-matter' --client_image='required-but-does-not-matter'
#######################################
# The BASIC cluster is used by url-map tests. Only cleaning the xds client
# namespaces; the xds server namespaces are shared.
# The keep hours is reduced to 6.
#######################################
cleanup::job::cleanup_cluster_url_map() {
cleanup::activate_cluster GKE_CLUSTER_PSM_BASIC
cleanup::run_clean "$1" --mode=k8s
}
#######################################
# The SECURITY cluster is used by the security and authz test suites.
#######################################
cleanup::job::cleanup_cluster_security() {
cleanup::activate_cluster GKE_CLUSTER_PSM_SECURITY
cleanup::run_clean "$1" --mode=k8s
}
#######################################
# Set common variables for the cleanup script.
# Globals:
# TEST_DRIVER_FLAGFILE: Relative path to test driver flagfile
# TEST_XML_OUTPUT_DIR: Output directory for the test xUnit XML report
# CLEANUP_KUBE_CONTEXT: The name of kubectl context with GKE cluster access.
# Arguments:
# Test job name. Currently only used to generate asset path, and uses
# values from the cleanup_jobs array of main().
# TODO(sergiitk): turn job_name into action test methods of the cleanup.
# Outputs:
# Writes the output of test execution to stdout, stderr,
# ${TEST_XML_OUTPUT_DIR}/${job_name}/sponge_log.log
#######################################
cleanup::run_clean() {
local job_name="${1:?Usage: cleanup::run_clean job_name}"
local out_dir="${TEST_XML_OUTPUT_DIR}/${job_name}"
mkdir -pv "${out_dir}"
# TODO(sergiitk): make it a test, where job_name is a separate method.
python3 -m bin.cleanup.cleanup \
--flagfile="${TEST_DRIVER_FLAGFILE}" \
--kube_context="${CLEANUP_KUBE_CONTEXT:-unset}" \
--keep_hours="${KEEP_HOURS}" \
"${@:2}" \
|& tee "${out_dir}/sponge_log.log"
}
#######################################
# Main function: provision software necessary to execute the cleanup tasks;
# run them, and report the status.
#######################################
main() {
local script_dir
script_dir="$(dirname "$0")"
# Source the test captured from the master branch.
echo "Sourcing test driver install captured from: ${TEST_DRIVER_INSTALL_SCRIPT_URL}"
source /dev/stdin <<< "$(curl -s "${TEST_DRIVER_INSTALL_SCRIPT_URL}")"
set +x
# Valid cluster variables needed for the automatic driver setup.
activate_gke_cluster GKE_CLUSTER_PSM_BASIC
kokoro_setup_test_driver "${GITHUB_REPOSITORY_NAME}"
# Run tests
cd "${TEST_DRIVER_FULL_DIR}"
local failed_jobs=0
declare -a cleanup_jobs
cleanup_jobs=(
"cleanup_td"
"cleanup_cluster_lb_primary"
"cleanup_cluster_lb_secondary"
"cleanup_cluster_security"
"cleanup_cluster_url_map"
)
for job_name in "${cleanup_jobs[@]}"; do
echo "-------------------- Starting job ${job_name} --------------------"
set -x
"cleanup::job::${job_name}" "${job_name}" || (( ++failed_jobs ))
set +x
echo "-------------------- Finished job ${job_name} --------------------"
done
echo "Failed job suites: ${failed_jobs}"
if (( failed_jobs > 0 )); then
exit 1
fi
}
main "$@"

@ -17,13 +17,12 @@ This is intended as a tool to delete leaked resources from old tests.
Typical usage examples:
python3 tools/run_tests/xds_k8s_test_driver/bin/cleanup/cleanup.py\
--project=grpc-testing\
--network=default-vpc\
python3 -m bin.cleanup.cleanup \
--project=grpc-testing \
--network=default-vpc \
--kube_context=gke_grpc-testing_us-central1-a_psm-interop-security
--resource_prefix='required-but-does-not-matter'\
--td_bootstrap_image='required-but-does-not-matter' --server_image='required-but-does-not-matter' --client_image='required-but-does-not-matter'
"""
import dataclasses
import datetime
import functools
import json
@ -31,7 +30,8 @@ import logging
import os
import re
import subprocess
from typing import Any, List
import sys
from typing import Any, Callable, List, Optional
from absl import app
from absl import flags
@ -39,6 +39,7 @@ import dateutil
from framework import xds_flags
from framework import xds_k8s_flags
from framework.helpers import retryers
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.infrastructure import traffic_director
@ -52,20 +53,31 @@ _KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
GCLOUD = os.environ.get("GCLOUD", "gcloud")
GCLOUD_CMD_TIMEOUT_S = datetime.timedelta(seconds=5).total_seconds()
ZONE = "us-central1-a"
SECONDARY_ZONE = "us-west1-b"
PSM_SECURITY_PREFIX = "psm-interop" # Prefix for gke resources to delete.
URL_MAP_TEST_PREFIX = ( # Prefix for url-map test resources to delete.
"interop-psm-url-map"
# Skip known k8s system namespaces.
K8S_PROTECTED_NAMESPACES = {
"default",
"gke-managed-system",
"kube-node-lease",
"kube-public",
"kube-system",
}
# TODO(sergiitk): these should be flags.
LEGACY_DRIVER_ZONE = "us-central1-a"
LEGACY_DRIVER_SECONDARY_ZONE = "us-west1-b"
PSM_INTEROP_PREFIX = "psm-interop" # Prefix for gke resources to delete.
URL_MAP_TEST_PREFIX = (
"interop-psm-url-map" # Prefix for url-map test resources to delete.
)
KEEP_PERIOD_HOURS = flags.DEFINE_integer(
"keep_hours",
default=168,
default=48,
help=(
"number of hours for a resource to keep. Resources older than this will"
" be deleted. Default is 168 (7 days)"
" be deleted. Default is 48 hours (2 days)"
),
)
DRY_RUN = flags.DEFINE_bool(
@ -75,7 +87,7 @@ DRY_RUN = flags.DEFINE_bool(
)
TD_RESOURCE_PREFIXES = flags.DEFINE_list(
"td_resource_prefixes",
default=[PSM_SECURITY_PREFIX],
default=[PSM_INTEROP_PREFIX],
help=(
"a comma-separated list of prefixes for which the leaked TD resources"
" will be deleted"
@ -83,7 +95,7 @@ TD_RESOURCE_PREFIXES = flags.DEFINE_list(
)
SERVER_PREFIXES = flags.DEFINE_list(
"server_prefixes",
default=[PSM_SECURITY_PREFIX],
default=[PSM_INTEROP_PREFIX],
help=(
"a comma-separated list of prefixes for which the leaked servers will"
" be deleted"
@ -91,12 +103,55 @@ SERVER_PREFIXES = flags.DEFINE_list(
)
CLIENT_PREFIXES = flags.DEFINE_list(
"client_prefixes",
default=[PSM_SECURITY_PREFIX, URL_MAP_TEST_PREFIX],
default=[PSM_INTEROP_PREFIX, URL_MAP_TEST_PREFIX],
help=(
"a comma-separated list of prefixes for which the leaked clients will"
" be deleted"
),
)
MODE = flags.DEFINE_enum(
"mode",
default="td",
enum_values=["k8s", "td", "td_no_legacy"],
help="Mode: Kubernetes or Traffic Director",
)
SECONDARY = flags.DEFINE_bool(
"secondary",
default=False,
help="Cleanup secondary (alternative) resources",
)
# The cleanup script performs some API calls directly, so some flags normally
# required to configure framework properly, are not needed here.
flags.FLAGS.set_default("resource_prefix", "ignored-by-cleanup")
flags.FLAGS.set_default("td_bootstrap_image", "ignored-by-cleanup")
flags.FLAGS.set_default("server_image", "ignored-by-cleanup")
flags.FLAGS.set_default("client_image", "ignored-by-cleanup")
@dataclasses.dataclass(eq=False)
class CleanupResult:
error_count: int = 0
error_messages: List[str] = dataclasses.field(default_factory=list)
def add_error(self, msg: str):
self.error_count += 1
self.error_messages.append(f" {self.error_count}. {msg}")
def format_messages(self):
return "\n".join(self.error_messages)
@dataclasses.dataclass(frozen=True)
class K8sResourceRule:
# regex to match
expression: str
# function to delete the resource
cleanup_ns_fn: Callable
# Global state, holding the result of the whole operation.
_CLEANUP_RESULT = CleanupResult()
def load_keep_config() -> None:
@ -129,7 +184,7 @@ def get_expire_timestamp() -> datetime.datetime:
)
def exec_gcloud(project: str, *cmds: List[str]) -> Json:
def exec_gcloud(project: str, *cmds: str) -> Json:
cmds = [GCLOUD, "--project", project, "--quiet"] + list(cmds)
if "list" in cmds:
# Add arguments to shape the list output
@ -167,10 +222,10 @@ def exec_gcloud(project: str, *cmds: List[str]) -> Json:
return None
def remove_relative_resources_run_xds_tests(
project: str, network: str, prefix: str, suffix: str
):
def cleanup_legacy_driver_resources(*, project: str, suffix: str, **kwargs):
"""Removing GCP resources created by run_xds_tests.py."""
# Unused, but kept for compatibility with cleanup_td_for_gke.
del kwargs
logging.info(
"----- Removing run_xds_tests.py resources with suffix [%s]", suffix
)
@ -244,7 +299,7 @@ def remove_relative_resources_run_xds_tests(
"delete",
f"test-ig{suffix}",
"--zone",
ZONE,
LEGACY_DRIVER_ZONE,
)
exec_gcloud(
project,
@ -254,7 +309,7 @@ def remove_relative_resources_run_xds_tests(
"delete",
f"test-ig-same-zone{suffix}",
"--zone",
ZONE,
LEGACY_DRIVER_ZONE,
)
exec_gcloud(
project,
@ -264,7 +319,7 @@ def remove_relative_resources_run_xds_tests(
"delete",
f"test-ig-secondary-zone{suffix}",
"--zone",
SECONDARY_ZONE,
LEGACY_DRIVER_SECONDARY_ZONE,
)
exec_gcloud(
project,
@ -281,21 +336,21 @@ def remove_relative_resources_run_xds_tests(
# Note that the varients are all based on the basic TrafficDirectorManager, so
# their `cleanup()` might do duplicate work. But deleting an non-exist resource
# returns 404, and is OK.
def cleanup_td_for_gke(project, network, resource_prefix, resource_suffix):
def cleanup_td_for_gke(*, project, prefix, suffix, network):
gcp_api_manager = gcp.api.GcpApiManager()
plain_td = traffic_director.TrafficDirectorManager(
gcp_api_manager,
project=project,
network=network,
resource_prefix=resource_prefix,
resource_suffix=resource_suffix,
resource_prefix=prefix,
resource_suffix=suffix,
)
security_td = traffic_director.TrafficDirectorSecureManager(
gcp_api_manager,
project=project,
network=network,
resource_prefix=resource_prefix,
resource_suffix=resource_suffix,
resource_prefix=prefix,
resource_suffix=suffix,
)
# TODO: cleanup appnet resources.
# appnet_td = traffic_director.TrafficDirectorAppNetManager(
@ -307,8 +362,8 @@ def cleanup_td_for_gke(project, network, resource_prefix, resource_suffix):
logger.info(
"----- Removing traffic director for gke, prefix %s, suffix %s",
resource_prefix,
resource_suffix,
prefix,
suffix,
)
security_td.cleanup(force=True)
# appnet_td.cleanup(force=True)
@ -320,32 +375,42 @@ def cleanup_client(
project,
network,
k8s_api_manager,
resource_prefix,
resource_suffix,
client_namespace,
gcp_api_manager,
gcp_service_account,
*,
suffix: Optional[str] = "",
):
runner_kwargs = dict(
deployment_name=xds_flags.CLIENT_NAME.value,
image_name=xds_k8s_flags.CLIENT_IMAGE.value,
td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value,
deployment_name = xds_flags.CLIENT_NAME.value
if suffix:
deployment_name = f"{deployment_name}-{suffix}"
ns = k8s.KubernetesNamespace(k8s_api_manager, client_namespace)
# Shorten the timeout to avoid waiting for the stuck namespaces.
# Normal ns deletion during the cleanup takes less two minutes.
ns.wait_for_namespace_deleted_timeout_sec = 5 * 60
client_runner = _KubernetesClientRunner(
k8s_namespace=ns,
deployment_name=deployment_name,
gcp_project=project,
gcp_api_manager=gcp.api.GcpApiManager(),
gcp_service_account=gcp_service_account,
xds_server_uri=xds_flags.XDS_SERVER_URI.value,
network=network,
stats_port=xds_flags.CLIENT_PORT.value,
)
client_namespace = _KubernetesClientRunner.make_namespace_name(
resource_prefix, resource_suffix
)
client_runner = _KubernetesClientRunner(
k8s.KubernetesNamespace(k8s_api_manager, client_namespace),
**runner_kwargs,
gcp_service_account=gcp_service_account,
gcp_api_manager=gcp_api_manager,
image_name="",
td_bootstrap_image="",
)
logger.info("Cleanup client")
try:
client_runner.cleanup(force=True, force_namespace=True)
except retryers.RetryError as err:
logger.error(
"Timeout waiting for namespace %s deletion. "
"Failed resource status:\n\n%s",
ns.name,
ns.pretty_format_status(err.result()),
)
raise
# cleanup_server creates a server runner, and calls its cleanup() method.
@ -353,30 +418,42 @@ def cleanup_server(
project,
network,
k8s_api_manager,
resource_prefix,
resource_suffix,
server_namespace,
gcp_api_manager,
gcp_service_account,
*,
suffix: Optional[str] = "",
):
runner_kwargs = dict(
deployment_name=xds_flags.SERVER_NAME.value,
image_name=xds_k8s_flags.SERVER_IMAGE.value,
td_bootstrap_image=xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value,
deployment_name = xds_flags.SERVER_NAME.value
if suffix:
deployment_name = f"{deployment_name}-{suffix}"
ns = k8s.KubernetesNamespace(k8s_api_manager, server_namespace)
# Shorten the timeout to avoid waiting for the stuck namespaces.
# Normal ns deletion during the cleanup takes less two minutes.
ns.wait_for_namespace_deleted_timeout_sec = 5 * 60
server_runner = _KubernetesServerRunner(
k8s_namespace=ns,
deployment_name=deployment_name,
gcp_project=project,
gcp_api_manager=gcp.api.GcpApiManager(),
gcp_service_account=gcp_service_account,
network=network,
)
server_namespace = _KubernetesServerRunner.make_namespace_name(
resource_prefix, resource_suffix
)
server_runner = _KubernetesServerRunner(
k8s.KubernetesNamespace(k8s_api_manager, server_namespace),
**runner_kwargs,
gcp_service_account=gcp_service_account,
gcp_api_manager=gcp_api_manager,
image_name="",
td_bootstrap_image="",
)
logger.info("Cleanup server")
try:
server_runner.cleanup(force=True, force_namespace=True)
except retryers.RetryError as err:
logger.error(
"Timeout waiting for namespace %s deletion. "
"Failed resource status:\n\n%s",
ns.name,
ns.pretty_format_status(err.result()),
)
raise
def delete_leaked_td_resources(
@ -390,14 +467,19 @@ def delete_leaked_td_resources(
logging.info("----- Skipped [Dry Run]: %s", resource["name"])
continue
matched = False
for regex, resource_prefix, keep, remove in td_resource_rules:
for regex, resource_prefix, keep, remove_fn in td_resource_rules:
result = re.search(regex, resource["name"])
if result is not None:
matched = True
if keep(result.group(1)):
logging.info("Skipped [keep]:")
break # break inner loop, continue outer loop
remove(project, network, resource_prefix, result.group(1))
remove_fn(
project=project,
prefix=resource_prefix,
suffix=result.group(1),
network=network,
)
break
if not matched:
logging.info(
@ -414,58 +496,97 @@ def delete_k8s_resources(
gcp_service_account,
namespaces,
):
gcp_api_manager = gcp.api.GcpApiManager()
for ns in namespaces:
namespace_name: str = ns.metadata.name
if namespace_name in K8S_PROTECTED_NAMESPACES:
continue
logger.info("-----")
logger.info("----- Cleaning up k8s namespaces %s", ns.metadata.name)
if ns.metadata.creation_timestamp <= get_expire_timestamp():
logger.info("----- Cleaning up k8s namespaces %s", namespace_name)
if ns.metadata.creation_timestamp > get_expire_timestamp():
logging.info(
"----- Skipped [resource is within expiry date]: %s",
namespace_name,
)
continue
if dry_run:
# Skip deletion for dry-runs
logging.info("----- Skipped [Dry Run]: %s", ns.metadata.name)
continue
matched = False
for regex, resource_prefix, remove in k8s_resource_rules:
result = re.search(regex, ns.metadata.name)
if result is not None:
matched = True
remove(
rule: K8sResourceRule = _rule_match_k8s_namespace(
namespace_name, k8s_resource_rules
)
if not rule:
logging.info(
"----- Skipped [does not matching resource name templates]: %s",
namespace_name,
)
continue
# Cleaning up.
try:
rule.cleanup_ns_fn(
project,
network,
k8s_api_manager,
resource_prefix,
result.group(1),
namespace_name,
gcp_api_manager,
gcp_service_account,
suffix=("alt" if SECONDARY.value else None),
)
break
if not matched:
logging.info(
"----- Skipped [does not matching resource name templates]"
except k8s.NotFound:
logging.warning("----- Skipped [not found]: %s", namespace_name)
except retryers.RetryError as err:
_CLEANUP_RESULT.add_error(
"Retries exhausted while waiting for the "
f"deletion of namespace {namespace_name}: "
f"{err}"
)
logging.exception(
"----- Skipped [cleanup timed out]: %s", namespace_name
)
else:
logging.info("----- Skipped [resource is within expiry date]")
except Exception as err: # noqa pylint: disable=broad-except
_CLEANUP_RESULT.add_error(
"Unexpected error while deleting "
f"namespace {namespace_name}: {err}"
)
logging.exception(
"----- Skipped [cleanup unexpected error]: %s", namespace_name
)
logger.info("-----")
def _rule_match_k8s_namespace(
namespace_name: str, k8s_resource_rules: List[K8sResourceRule]
) -> Optional[K8sResourceRule]:
for rule in k8s_resource_rules:
result = re.search(rule.expression, namespace_name)
if result is not None:
return rule
return None
def find_and_remove_leaked_k8s_resources(
dry_run, project, network, gcp_service_account
dry_run, project, network, gcp_service_account, k8s_context
):
k8s_resource_rules = [
# items in each tuple, in order
# - regex to match
# - prefix of the resources
# - function to delete the resource
]
k8s_resource_rules: List[K8sResourceRule] = []
for prefix in CLIENT_PREFIXES.value:
k8s_resource_rules.append(
(f"{prefix}-client-(.*)", prefix, cleanup_client),
K8sResourceRule(f"{prefix}-client-(.*)", cleanup_client)
)
for prefix in SERVER_PREFIXES.value:
k8s_resource_rules.append(
(f"{prefix}-server-(.*)", prefix, cleanup_server),
K8sResourceRule(f"{prefix}-server-(.*)", cleanup_server)
)
# Delete leaked k8s namespaces, those usually mean there are leaked testing
# client/servers from the gke framework.
k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value)
k8s_api_manager = k8s.KubernetesApiManager(k8s_context)
nss = k8s_api_manager.core.list_namespace()
delete_k8s_resources(
dry_run,
@ -478,38 +599,32 @@ def find_and_remove_leaked_k8s_resources(
)
def main(argv):
if len(argv) > 1:
raise app.UsageError("Too many command-line arguments.")
load_keep_config()
# Must be called before KubernetesApiManager or GcpApiManager init.
xds_flags.set_socket_default_timeout_from_flag()
project: str = xds_flags.PROJECT.value
network: str = xds_flags.NETWORK.value
gcp_service_account: str = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value
dry_run: bool = DRY_RUN.value
def find_and_remove_leaked_td_resources(dry_run, project, network):
cleanup_legacy: bool = MODE.value != "td_no_legacy"
td_resource_rules = [
# itmes in each tuple, in order
# - regex to match
# - prefix of the resource (only used by gke resources)
# - function to check of the resource should be kept
# - function to delete the resource
]
if cleanup_legacy:
td_resource_rules += [
(
r"test-hc(.*)",
"",
is_marked_as_keep_gce,
remove_relative_resources_run_xds_tests,
cleanup_legacy_driver_resources,
),
(
r"test-template(.*)",
"",
is_marked_as_keep_gce,
remove_relative_resources_run_xds_tests,
cleanup_legacy_driver_resources,
),
]
for prefix in TD_RESOURCE_PREFIXES.value:
td_resource_rules.append(
(
@ -521,7 +636,8 @@ def main(argv):
)
# List resources older than KEEP_PERIOD. We only list health-checks and
# instance templates because these are leaves in the resource dependency tree.
# instance templates because these are leaves in the resource dependency
# tree.
#
# E.g. forwarding-rule depends on the target-proxy. So leaked
# forwarding-rule indicates there's a leaked target-proxy (because this
@ -529,31 +645,69 @@ def main(argv):
# leaked target-proxy is guaranteed to be a super set of leaked
# forwarding-rule.
compute = gcp.compute.ComputeV1(gcp.api.GcpApiManager(), project)
leakedHealthChecks = []
leaked_health_checks = []
for item in compute.list_health_check()["items"]:
if (
dateutil.parser.isoparse(item["creationTimestamp"])
<= get_expire_timestamp()
):
leakedHealthChecks.append(item)
leaked_health_checks.append(item)
delete_leaked_td_resources(
dry_run, td_resource_rules, project, network, leakedHealthChecks
dry_run, td_resource_rules, project, network, leaked_health_checks
)
# Delete leaked instance templates, those usually mean there are leaked VMs
# from the gce framework. Also note that this is only needed for the gce
# resources.
leakedInstanceTemplates = exec_gcloud(
if cleanup_legacy:
leaked_instance_templates = exec_gcloud(
project, "compute", "instance-templates", "list"
)
delete_leaked_td_resources(
dry_run, td_resource_rules, project, network, leakedInstanceTemplates
dry_run,
td_resource_rules,
project,
network,
leaked_instance_templates,
)
def main(argv):
# TODO(sergiitk): instead, base on absltest so that result.xml is available.
if len(argv) > 1:
raise app.UsageError("Too many command-line arguments.")
load_keep_config()
# Must be called before KubernetesApiManager or GcpApiManager init.
xds_flags.set_socket_default_timeout_from_flag()
project: str = xds_flags.PROJECT.value
network: str = xds_flags.NETWORK.value
gcp_service_account: str = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value
dry_run: bool = DRY_RUN.value
k8s_context: str = xds_k8s_flags.KUBE_CONTEXT.value
if MODE.value == "td" or MODE.value == "td_no_legacy":
find_and_remove_leaked_td_resources(dry_run, project, network)
elif MODE.value == "k8s":
# 'unset' value is used in td-only mode to bypass the validation
# for the required flag.
assert k8s_context != "unset"
find_and_remove_leaked_k8s_resources(
dry_run, project, network, gcp_service_account
dry_run, project, network, gcp_service_account, k8s_context
)
logger.info("##################### Done cleaning up #####################")
if _CLEANUP_RESULT.error_count > 0:
logger.error(
"Cleanup failed for %i resource(s). Errors: [\n%s\n].\n"
"Please inspect the log files for stack traces corresponding "
"to these errors.",
_CLEANUP_RESULT.error_count,
_CLEANUP_RESULT.format_messages(),
)
sys.exit(1)
if __name__ == "__main__":

@ -1,42 +0,0 @@
# Copyright 2022 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Clean up GKE namespaces leaked by the tests."""
from absl import app
from bin.cleanup import cleanup
from framework import xds_flags
from framework import xds_k8s_flags
def main(argv):
if len(argv) > 1:
raise app.UsageError("Too many command-line arguments.")
cleanup.load_keep_config()
# Must be called before KubernetesApiManager or GcpApiManager init.
xds_flags.set_socket_default_timeout_from_flag()
project: str = xds_flags.PROJECT.value
network: str = xds_flags.NETWORK.value
gcp_service_account: str = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value
dry_run: bool = cleanup.DRY_RUN.value
cleanup.find_and_remove_leaked_k8s_resources(
dry_run, project, network, gcp_service_account
)
if __name__ == "__main__":
app.run(main)

@ -14,7 +14,9 @@
"""This contains common helpers for working with dates and time."""
import datetime
import re
from typing import Pattern
from typing import Optional, Pattern
import dateutil.parser
RE_ZERO_OFFSET: Pattern[str] = re.compile(r"[+\-]00:?00$")
@ -35,6 +37,11 @@ def iso8601_utc_time(time: datetime.datetime = None) -> str:
return shorten_utc_zone(utc_time.isoformat())
def iso8601_to_datetime(date_str: str) -> datetime.datetime:
# TODO(sergiitk): use regular datetime.datetime when upgraded to py3.11.
return dateutil.parser.isoparse(date_str)
def datetime_suffix(*, seconds: bool = False) -> str:
"""Return current UTC date, and time in a format useful for resource naming.
@ -48,3 +55,25 @@ def datetime_suffix(*, seconds: bool = False) -> str:
visually distinct from dash-separated date.
"""
return utc_now().strftime("%Y%m%d-%H%M" + ("%S" if seconds else ""))
def ago(date_from: datetime.datetime, now: Optional[datetime.datetime] = None):
if not now:
now = utc_now()
# Round down microseconds.
date_from = date_from.replace(microsecond=0)
now = now.replace(microsecond=0)
# Calculate the diff.
delta: datetime.timedelta = now - date_from
if delta.days > 1:
result = f"{delta.days} days"
elif delta.days > 0:
result = f"{delta.days} day"
else:
# This case covers negative deltas too.
result = f"{delta} (h:mm:ss)"
return f"{result} ago"

@ -34,6 +34,7 @@ import yaml
import framework.errors
from framework.helpers import retryers
import framework.helpers.datetime
import framework.helpers.highlighter
from framework.infrastructure.k8s_internal import k8s_log_collector
from framework.infrastructure.k8s_internal import k8s_port_forwarder
@ -41,7 +42,6 @@ from framework.infrastructure.k8s_internal import k8s_port_forwarder
logger = logging.getLogger(__name__)
# Type aliases
_HighlighterYaml = framework.helpers.highlighter.HighlighterYaml
PodLogCollector = k8s_log_collector.PodLogCollector
PortForwarder = k8s_port_forwarder.PortForwarder
V1Deployment = client.V1Deployment
@ -50,6 +50,7 @@ V1Pod = client.V1Pod
V1PodList = client.V1PodList
V1Service = client.V1Service
V1Namespace = client.V1Namespace
V1ObjectMeta = client.V1ObjectMeta
DynResourceInstance = dynamic_res.ResourceInstance
GammaMesh = DynResourceInstance
@ -60,6 +61,9 @@ GcpSessionAffinityFilter = DynResourceInstance
GcpBackendPolicy = DynResourceInstance
_timedelta = datetime.timedelta
_datetime = datetime.datetime
_helper_datetime = framework.helpers.datetime
_HighlighterYaml = framework.helpers.highlighter.HighlighterYaml
_ApiException = client.ApiException
_FailToCreateError = utils.FailToCreateError
@ -265,6 +269,10 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
WAIT_LONG_SLEEP_SEC: int = 30
WAIT_POD_START_TIMEOUT_SEC: int = 3 * 60
# TODO(sergiitk): Find a better way. Maybe like in framework.rpc.grpc?
wait_for_namespace_deleted_timeout_sec = None
wait_for_namespace_deleted_sleep_sec = None
def __init__(self, api: KubernetesApiManager, name: str):
self._api = api
self._name = name
@ -761,9 +769,20 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
def wait_for_namespace_deleted(
self,
timeout_sec: int = WAIT_LONG_TIMEOUT_SEC,
wait_sec: int = WAIT_LONG_SLEEP_SEC,
timeout_sec: Optional[int] = None,
wait_sec: Optional[int] = None,
) -> None:
if timeout_sec is None:
if self.wait_for_namespace_deleted_timeout_sec is not None:
timeout_sec = self.wait_for_namespace_deleted_timeout_sec
else:
timeout_sec = self.WAIT_LONG_TIMEOUT_SEC
if wait_sec is None:
if self.wait_for_namespace_deleted_sleep_sec is not None:
wait_sec = self.wait_for_namespace_deleted_timeout_sec
else:
wait_sec = self.WAIT_LONG_SLEEP_SEC
retryer = retryers.constant_retryer(
wait_fixed=_timedelta(seconds=wait_sec),
timeout=_timedelta(seconds=timeout_sec),
@ -797,9 +816,9 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
f"\nThis indicates the NEG wasn't created OR"
f" the NEG creation event hasn't propagated to Kubernetes."
f" Service metadata:\n"
f"{self._pretty_format_metadata(result, highlight=False)}"
f"{self.pretty_format_metadata(result, highlight=False)}"
f"Service status:\n"
f"{self._pretty_format_status(result, highlight=False)}"
f"{self.pretty_format_status(result, highlight=False)}"
),
)
retry_err.add_note(note)
@ -861,7 +880,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
info_below=(
f"Timeout {timeout} (h:mm:ss) waiting for deployment {name}"
f" to report {count} replicas available. Last status:\n"
f"{self._pretty_format_status(result, highlight=False)}"
f"{self.pretty_format_status(result, highlight=False)}"
),
)
retry_err.add_note(note)
@ -890,7 +909,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
info_below=(
f"Timeout {timeout} (h:mm:ss) waiting for pod count"
f" {count}, got: {len(result)}. Pod statuses:\n"
f"{self._pretty_format_statuses(result, highlight=False)}"
f"{self.pretty_format_status(result, highlight=False)}"
),
)
retry_err.add_note(note)
@ -944,7 +963,7 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
info_below=(
f"Timeout {timeout} (h:mm:ss) waiting for pod"
f" {pod_name} to start. Pod status:\n"
f"{self._pretty_format_status(result, highlight=False)}"
f"{self.pretty_format_status(result, highlight=False)}"
),
)
)
@ -989,35 +1008,53 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
pod_log_collector.start()
return pod_log_collector
def _pretty_format_statuses(
def pretty_format_statuses(
self,
k8s_objects: List[Optional[object]],
*,
highlight: bool = True,
) -> str:
return "\n".join(
self._pretty_format_status(k8s_object, highlight=highlight)
self.pretty_format_status(k8s_object, highlight=highlight)
for k8s_object in k8s_objects
)
def _pretty_format_status(
def pretty_format_status(
self,
k8s_object: Optional[object],
*,
highlight: bool = True,
) -> str:
if k8s_object is None:
return "No data"
result = []
metadata: Optional[V1ObjectMeta] = None
if isinstance(getattr(k8s_object, "metadata", None), V1ObjectMeta):
# Parse the name if present.
if hasattr(k8s_object, "metadata") and hasattr(
k8s_object.metadata, "name"
):
name = k8s_object.metadata.name
else:
name = "Can't parse resource name"
metadata: V1ObjectMeta = k8s_object.metadata
# Parse name if, present, but always indicate unsuccessful parse.
name = metadata.name if metadata else "Can't parse resource name"
result.append(f"Resource name: {name}")
# Add kubernetes kind (resource type) if present.
if hasattr(k8s_object, "kind"):
result.append(f"Resource kind: {k8s_object.kind}")
# Add the timestamps if present.
if metadata and metadata.creation_timestamp:
result.append(
f"Created: {metadata.creation_timestamp};"
f" {_helper_datetime.ago(metadata.creation_timestamp)}"
)
if metadata and metadata.deletion_timestamp:
result.append(
f"Deletion requested: {metadata.deletion_timestamp};"
f" {_helper_datetime.ago(metadata.deletion_timestamp)}"
)
# Pretty-print the status if present.
result.append("")
if hasattr(k8s_object, "status"):
try:
status = self._pretty_format(
@ -1030,11 +1067,11 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
status = f"Can't parse resource status: {e}"
else:
status = "Can't parse resource status"
result.append(status)
# Return the name of k8s object, and its pretty-printed status.
return f"{name}:\n{status}\n"
return "\n".join(result) + "\n"
def _pretty_format_metadata(
def pretty_format_metadata(
self,
k8s_object: Optional[object],
*,

Loading…
Cancel
Save