Migrating urlMap related test cases to K8s framework (#26367)

* Build url-map test case class & migrate two test cases

- Migrated header matching
- Migrated path matching

* Polish some details

* Give sanity test the trailing line return it wants

* Address comments & add timeout cases

* Add fault injection and CSDS test cases

* Squashed commit of the following:

commit 0bf96a0e3eae59d76a456b7deed2c6c1314f4228
Author: Lidi Zheng <lidiz@google.com>
Date:   Tue Jun 8 12:35:19 2021 -0700

    Bazel

commit 81b4bf926b656295f1db78ab4c2ff934ab5602af
Author: Lidi Zheng <lidiz@google.com>
Date:   Mon Jun 7 00:04:28 2021 -0700

    Enable Bazel

commit 245026ed696357308141944aa8c4c9f5b2013a4b
Author: Lidi Zheng <lidiz@google.com>
Date:   Fri Jun 4 10:37:48 2021 -0700

    Reuse

commit 564d39d28b2201803bfdb379f78e8298623c4029
Author: Lidi Zheng <lidiz@google.com>
Date:   Fri Jun 4 01:44:56 2021 -0700

    Keep`

commit 83c437d72a76b71948314f7d5389aadca39b3d07
Author: Lidi Zheng <lidiz@google.com>
Date:   Thu Jun 3 18:50:47 2021 -0700

    1800s timeout

commit 1e790a53cd01a1e54bf7df6793381812a9c58c01
Author: Lidi Zheng <lidiz@google.com>
Date:   Thu Jun 3 18:45:41 2021 -0700

    Go

commit 68a99640a100ecf2989f0808bd69a8e17f7156bd
Author: Lidi Zheng <lidiz@google.com>
Date:   Thu Jun 3 17:26:29 2021 -0700

    No

commit d4f6fa5d4a1208c9dfac76e58ca15daa64231c2e
Author: Lidi Zheng <lidiz@google.com>
Date:   Thu Jun 3 17:20:46 2021 -0700

    L

commit 5ccd048f185c1f5a8d5fdd4359f3157d8d64ea2a
Author: Lidi Zheng <lidiz@google.com>
Date:   Thu Jun 3 16:30:55 2021 -0700

    Go

commit 411887c7181a593ba3c98fe09bf9f74d52a6ad9c
Author: Lidi Zheng <lidiz@google.com>
Date:   Thu Jun 3 15:51:34 2021 -0700

    Test

commit 8d005e5203e608c18ffe89fb2f60f81b1ba4ffa8
Author: Lidi Zheng <lidiz@google.com>
Date:   Thu Jun 3 15:11:44 2021 -0700

    Go

commit 7c189cb59077ecfb981ca99a8458cf44f3c10526
Author: Lidi Zheng <lidiz@google.com>
Date:   Thu Jun 3 15:11:03 2021 -0700

    Go

commit 8fa65a68ca9d5b71c47c8bdf5367ed3189e49cba
Author: Lidi Zheng <lidiz@google.com>
Date:   Thu Jun 3 13:08:11 2021 -0700

    Go

* Port changes from dev branch and polish

* Remove unused code

* Fix Bazel build

* Finally find out the secret about rpc-behavior

* Prepare for merging

* Fix bash script and use a more serious name

* Enable port forwarding

* Resolve comments

* Reuse existing code && stylish changes

* Update for interface changes

* Refactor to remove load_tests and explicit global variables

* Improve readability

* Disable AlwaysDelay for now

* Fix issues with resource creation

* Fix unexpected behavior of TestLoader.discover

* Add load_tests to timeout_test

* Improve documentation

* I really shouldn't run tests in parallel at this point

* Resolve reviewers' comments && revert DO-NOT-MERGE changes

* Two small cleanups

* Fix GCP resource creation path

* Polish some test logic

* Correct the non-failfast logic

* Put the accidentally deleted copyright line back

* Separate the Kokoro job from security tests

* Deflake configure and get_stats && remove DO-NOT-MERGE changes
pull/26560/head
Lidi Zheng 4 years ago committed by GitHub
parent 6e456ba2af
commit ee4f6854bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      tools/internal_ci/linux/grpc_xds_k8s.sh
  2. 26
      tools/internal_ci/linux/grpc_xds_url_map.cfg
  3. 142
      tools/internal_ci/linux/grpc_xds_url_map.sh
  4. 2
      tools/run_tests/xds_k8s_test_driver/framework/helpers/retryers.py
  5. 6
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/api.py
  6. 9
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py
  7. 6
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/iam.py
  8. 72
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  9. 58
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py
  10. 4
      tools/run_tests/xds_k8s_test_driver/framework/test_app/base_runner.py
  11. 20
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
  12. 271
      tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py
  13. 415
      tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_testcase.py
  14. 13
      tools/run_tests/xds_k8s_test_driver/tests/url_map/__init__.py
  15. 31
      tools/run_tests/xds_k8s_test_driver/tests/url_map/__main__.py
  16. 70
      tools/run_tests/xds_k8s_test_driver/tests/url_map/csds_test.py
  17. 362
      tools/run_tests/xds_k8s_test_driver/tests/url_map/fault_injection_test.py
  18. 357
      tools/run_tests/xds_k8s_test_driver/tests/url_map/header_matching_test.py
  19. 220
      tools/run_tests/xds_k8s_test_driver/tests/url_map/path_matching_test.py
  20. 159
      tools/run_tests/xds_k8s_test_driver/tests/url_map/timeout_test.py

@ -104,7 +104,8 @@ run_test() {
--client_image="${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \
--xml_output_file="${TEST_XML_OUTPUT_DIR}/${test_name}/sponge_log.xml" \
--force_cleanup \
--nocheck_local_certs
--nocheck_local_certs \
${@:2}
set +x
}
@ -144,6 +145,10 @@ main() {
cd "${TEST_DRIVER_FULL_DIR}"
run_test baseline_test
run_test security_test
run_test url_map \
--namespace=interop-psm-url-map \
--server_xds_port=8848 \
--debug_use_port_forwarding
}
main "$@"

@ -0,0 +1,26 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/linux/grpc_xds_url_map.sh"
timeout_mins: 120
action {
define_artifacts {
regex: "artifacts/**/*sponge_log.xml"
regex: "artifacts/**/*sponge_log.log"
strip_prefix: "artifacts"
}
}

@ -0,0 +1,142 @@
#!/usr/bin/env bash
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex -o igncr || set -ex
# Constants
readonly GITHUB_REPOSITORY_NAME="grpc"
# GKE Cluster
readonly GKE_CLUSTER_NAME="interop-test-psm-sec-v2-us-central1-a"
readonly GKE_CLUSTER_ZONE="us-central1-a"
## xDS test client Docker images
readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/cpp-client"
readonly FORCE_IMAGE_BUILD="${FORCE_IMAGE_BUILD:-0}"
readonly BUILD_APP_PATH="interop-testing/build/install/grpc-interop-testing"
#######################################
# Builds test app Docker images and pushes them to GCR
# Globals:
# BUILD_APP_PATH
# CLIENT_IMAGE_NAME: Test client Docker image name
# GIT_COMMIT: SHA-1 of git commit being built
# Arguments:
# None
# Outputs:
# Writes the output of `gcloud builds submit` to stdout, stderr
#######################################
build_test_app_docker_images() {
echo "Building C++ xDS interop test app Docker images"
docker build -f "${SRC_DIR}/tools/dockerfile/interoptest/grpc_interop_cxx_xds/Dockerfile.xds_client" -t "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" "${SRC_DIR}"
gcloud -q auth configure-docker
docker push "${CLIENT_IMAGE_NAME}:${GIT_COMMIT}"
}
#######################################
# Builds test app and its docker images unless they already exist
# Globals:
# CLIENT_IMAGE_NAME: Test client Docker image name
# GIT_COMMIT: SHA-1 of git commit being built
# FORCE_IMAGE_BUILD
# Arguments:
# None
# Outputs:
# Writes the output to stdout, stderr
#######################################
build_docker_images_if_needed() {
# Check if images already exist
client_tags="$(gcloud_gcr_list_image_tags "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}")"
printf "Client image: %s:%s\n" "${CLIENT_IMAGE_NAME}" "${GIT_COMMIT}"
echo "${client_tags:-Client image not found}"
# Build if any of the images are missing, or FORCE_IMAGE_BUILD=1
if [[ "${FORCE_IMAGE_BUILD}" == "1" || -z "${client_tags}" ]]; then
build_test_app_docker_images
else
echo "Skipping C++ test app build"
fi
}
#######################################
# Executes the test case
# Globals:
# TEST_DRIVER_FLAGFILE: Relative path to test driver flagfile
# KUBE_CONTEXT: The name of kubectl context with GKE cluster access
# TEST_XML_OUTPUT_DIR: Output directory for the test xUnit XML report
# CLIENT_IMAGE_NAME: Test client Docker image name
# GIT_COMMIT: SHA-1 of git commit being built
# Arguments:
# Test case name
# Outputs:
# Writes the output of test execution to stdout, stderr
# Test xUnit report to ${TEST_XML_OUTPUT_DIR}/${test_name}/sponge_log.xml
#######################################
run_test() {
# Test driver usage:
# https://github.com/grpc/grpc/tree/master/tools/run_tests/xds_k8s_test_driver#basic-usage
local test_name="${1:?Usage: run_test test_name}"
set -x
# NOTE(lidiz) we pin the server image to java-server because: 1. only Java
# server understands the rpc-behavior metadata; 2. all UrlMap tests today are
# testing client-side logic.
python -m "tests.${test_name}" \
--flagfile="${TEST_DRIVER_FLAGFILE}" \
--kube_context="${KUBE_CONTEXT}" \
--namespace="interop-psm-url-map" \
--server_xds_port=8848 \
--server_image="gcr.io/grpc-testing/xds-interop/java-server:d22f93e1ade22a1e026b57210f6fc21f7a3ca0cf" \
--client_image="${CLIENT_IMAGE_NAME}:${GIT_COMMIT}" \
--xml_output_file="${TEST_XML_OUTPUT_DIR}/${test_name}/sponge_log.xml" \
--strategy="reuse"
set +x
}
#######################################
# Main function: provision software necessary to execute tests, and run them
# Globals:
# KOKORO_ARTIFACTS_DIR
# GITHUB_REPOSITORY_NAME
# SRC_DIR: Populated with absolute path to the source repo
# TEST_DRIVER_REPO_DIR: Populated with the path to the repo containing
# the test driver
# TEST_DRIVER_FULL_DIR: Populated with the path to the test driver source code
# TEST_DRIVER_FLAGFILE: Populated with relative path to test driver flagfile
# TEST_XML_OUTPUT_DIR: Populated with the path to test xUnit XML report
# GIT_ORIGIN_URL: Populated with the origin URL of git repo used for the build
# GIT_COMMIT: Populated with the SHA-1 of git commit being built
# GIT_COMMIT_SHORT: Populated with the short SHA-1 of git commit being built
# KUBE_CONTEXT: Populated with name of kubectl context with GKE cluster access
# Arguments:
# None
# Outputs:
# Writes the output of test execution to stdout, stderr
#######################################
main() {
local script_dir
script_dir="$(dirname "$0")"
# shellcheck source=tools/internal_ci/linux/grpc_xds_k8s_install_test_driver.sh
source "${script_dir}/grpc_xds_k8s_install_test_driver.sh"
set -x
if [[ -n "${KOKORO_ARTIFACTS_DIR}" ]]; then
kokoro_setup_test_driver "${GITHUB_REPOSITORY_NAME}"
else
local_setup_test_driver "${script_dir}"
fi
build_docker_images_if_needed
# Run tests
cd "${TEST_DRIVER_FULL_DIR}"
run_test url_map
}
main "$@"

@ -84,7 +84,7 @@ def constant_retryer(*,
if attempts > 0:
stops.append(_stop_after_attempt(attempts))
if timeout is not None:
stops.append(_stop_after_delay.total_seconds())
stops.append(_stop_after_delay(timeout.total_seconds()))
return Retrying(retry=_retry_on_exceptions(retry_on_exceptions),
wait=_wait_fixed(wait_fixed.total_seconds()),

@ -309,7 +309,7 @@ class GcpProjectApiResource:
return retryer(operation_request.execute)
@staticmethod
def _resource_pretty_format(body: dict) -> str:
def resource_pretty_format(body: dict) -> str:
"""Return a string with pretty-printed resource body."""
return yaml.dump(body, explicit_start=True, explicit_end=True)
@ -328,7 +328,7 @@ class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta):
def _create_resource(self, collection: discovery.Resource, body: dict,
**kwargs):
logger.info("Creating %s resource:\n%s", self.api_name,
self._resource_pretty_format(body))
self.resource_pretty_format(body))
create_req = collection.create(parent=self.parent(),
body=body,
**kwargs)
@ -347,7 +347,7 @@ class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta):
def _get_resource(self, collection: discovery.Resource, full_name):
resource = collection.get(name=full_name).execute()
logger.info('Loaded %s:\n%s', full_name,
self._resource_pretty_format(resource))
self.resource_pretty_format(resource))
return resource
def _delete_resource(self, collection: discovery.Resource,

@ -181,6 +181,9 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
}],
})
def create_url_map_with_content(self, url_map_body: Any) -> GcpResource:
return self._insert_resource(self.api.urlMaps(), url_map_body)
def delete_url_map(self, name):
self._delete_resource(self.api.urlMaps(), 'urlMap', name)
@ -323,19 +326,19 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
**kwargs) -> GcpResource:
resp = collection.get(project=self.project, **kwargs).execute()
logger.info('Loaded compute resource:\n%s',
self._resource_pretty_format(resp))
self.resource_pretty_format(resp))
return self.GcpResource(resp['name'], resp['selfLink'])
def _insert_resource(self, collection: discovery.Resource,
body: Dict[str, Any]) -> GcpResource:
logger.info('Creating compute resource:\n%s',
self._resource_pretty_format(body))
self.resource_pretty_format(body))
resp = self._execute(collection.insert(project=self.project, body=body))
return self.GcpResource(body['name'], resp['targetLink'])
def _patch_resource(self, collection, body, **kwargs):
logger.info('Patching compute resource:\n%s',
self._resource_pretty_format(body))
self.resource_pretty_format(body))
self._execute(
collection.patch(project=self.project, body=body, **kwargs))

@ -217,7 +217,7 @@ class IamV1(gcp.api.GcpProjectApiResource):
request: _HttpRequest = self._service_accounts.get(name=resource_name)
response: Dict[str, Any] = self._execute(request)
logger.debug('Loaded Service Account:\n%s',
self._resource_pretty_format(response))
self.resource_pretty_format(response))
return ServiceAccount.from_response(response)
def get_service_account_iam_policy(self, account: str) -> Policy:
@ -227,7 +227,7 @@ class IamV1(gcp.api.GcpProjectApiResource):
options_requestedPolicyVersion=self.POLICY_VERSION)
response: Dict[str, Any] = self._execute(request)
logger.debug('Loaded Service Account Policy:\n%s',
self._resource_pretty_format(response))
self.resource_pretty_format(response))
return Policy.from_response(response)
def set_service_account_iam_policy(self, account: str,
@ -239,7 +239,7 @@ class IamV1(gcp.api.GcpProjectApiResource):
resource_name = self.service_account_resource_name(account)
body = {'policy': policy.as_dict()}
logger.debug('Updating Service Account %s policy:\n%s', account,
self._resource_pretty_format(body))
self.resource_pretty_format(body))
try:
request: _HttpRequest = self._service_accounts.setIamPolicy(
resource=resource_name, body=body)

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List, Optional, Set
from typing import Any, List, Optional, Set
from framework import xds_flags
from framework.infrastructure import gcp
@ -42,6 +42,7 @@ EndpointConfigSelector = _NetworkServicesV1Alpha1.EndpointConfigSelector
class TrafficDirectorManager:
compute: _ComputeV1
BACKEND_SERVICE_NAME = "backend-service"
ALTERNATIVE_BACKEND_SERVICE_NAME = "alternative-backend-service"
HEALTH_CHECK_NAME = "health-check"
URL_MAP_NAME = "url-map"
URL_MAP_PATH_MATCHER_NAME = "path-matcher"
@ -77,6 +78,11 @@ class TrafficDirectorManager:
self.target_proxy_is_http: bool = False
self.forwarding_rule: Optional[GcpResource] = None
self.backends: Set[ZonalGcpResource] = set()
self.alternative_backend_service: Optional[GcpResource] = None
# TODO(sergiitk): remove this flag once backend service resource loaded
self.alternative_backend_service_protocol: Optional[
BackendServiceProtocol] = None
self.alternative_backends: Set[ZonalGcpResource] = set()
@property
def network_url(self):
@ -115,6 +121,7 @@ class TrafficDirectorManager:
self.delete_target_grpc_proxy(force=force)
self.delete_url_map(force=force)
self.delete_backend_service(force=force)
self.delete_alternative_backend_service(force=force)
self.delete_health_check(force=force)
def _ns_name(self, name):
@ -202,6 +209,63 @@ class TrafficDirectorManager:
self.compute.wait_for_backends_healthy_status(self.backend_service,
self.backends)
def create_alternative_backend_service(
self, protocol: Optional[BackendServiceProtocol] = _BackendGRPC):
if protocol is None:
protocol = _BackendGRPC
name = self._ns_name(self.ALTERNATIVE_BACKEND_SERVICE_NAME)
logger.info('Creating %s Alternative Backend Service "%s"',
protocol.name, name)
resource = self.compute.create_backend_service_traffic_director(
name, health_check=self.health_check, protocol=protocol)
self.alternative_backend_service = resource
self.alternative_backend_service_protocol = protocol
def load_alternative_backend_service(self):
name = self._ns_name(self.ALTERNATIVE_BACKEND_SERVICE_NAME)
resource = self.compute.get_backend_service_traffic_director(name)
self.alternative_backend_service = resource
def delete_alternative_backend_service(self, force=False):
if force:
name = self._ns_name(self.ALTERNATIVE_BACKEND_SERVICE_NAME)
elif self.alternative_backend_service:
name = self.alternative_backend_service.name
else:
return
logger.info('Deleting Alternative Backend Service "%s"', name)
self.compute.delete_backend_service(name)
self.alternative_backend_service = None
def alternative_backend_service_add_neg_backends(self, name, zones):
logger.info('Waiting for Network Endpoint Groups to load endpoints.')
for zone in zones:
backend = self.compute.wait_for_network_endpoint_group(name, zone)
logger.info('Loaded NEG "%s" in zone %s', backend.name,
backend.zone)
self.alternative_backends.add(backend)
self.alternative_backend_service_add_backends()
def alternative_backend_service_add_backends(self):
logging.info('Adding backends to Backend Service %s: %r',
self.alternative_backend_service.name,
self.alternative_backends)
self.compute.backend_service_add_backends(
self.alternative_backend_service, self.alternative_backends)
def alternative_backend_service_remove_all_backends(self):
logging.info('Removing backends from Backend Service %s',
self.alternative_backend_service.name)
self.compute.backend_service_remove_all_backends(
self.alternative_backend_service)
def wait_for_alternative_backends_healthy_status(self):
logger.debug(
"Waiting for Backend Service %s to report all backends healthy %r",
self.alternative_backend_service, self.alternative_backends)
self.compute.wait_for_backends_healthy_status(
self.alternative_backend_service, self.alternative_backends)
def create_url_map(
self,
src_host: str,
@ -218,6 +282,12 @@ class TrafficDirectorManager:
self.url_map = resource
return resource
def create_url_map_with_content(self, url_map_body: Any) -> GcpResource:
logger.info('Creating URL map: %s', url_map_body)
resource = self.compute.create_url_map_with_content(url_map_body)
self.url_map = resource
return resource
def delete_url_map(self, force=False):
if force:
name = self._ns_name(self.URL_MAP_NAME)

@ -16,7 +16,7 @@ This contains helpers for gRPC services defined in
https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/test.proto
"""
import logging
from typing import Optional
from typing import Iterable, Optional, Tuple
import grpc
@ -27,11 +27,14 @@ from src.proto.grpc.testing import test_pb2_grpc
# Type aliases
_LoadBalancerStatsRequest = messages_pb2.LoadBalancerStatsRequest
LoadBalancerStatsResponse = messages_pb2.LoadBalancerStatsResponse
_LoadBalancerAccumulatedStatsRequest = messages_pb2.LoadBalancerAccumulatedStatsRequest
LoadBalancerAccumulatedStatsResponse = messages_pb2.LoadBalancerAccumulatedStatsResponse
class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper):
stub: test_pb2_grpc.LoadBalancerStatsServiceStub
STATS_PARTIAL_RESULTS_TIMEOUT_SEC = 1200
STATS_ACCUMULATED_RESULTS_TIMEOUT_SEC = 600
def __init__(self, channel: grpc.Channel):
super().__init__(channel, test_pb2_grpc.LoadBalancerStatsServiceStub)
@ -51,3 +54,56 @@ class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper):
timeout_sec=timeout_sec),
deadline_sec=timeout_sec,
log_level=logging.INFO)
def get_client_accumulated_stats(
self,
*,
timeout_sec: Optional[int] = None
) -> LoadBalancerAccumulatedStatsResponse:
if timeout_sec is None:
timeout_sec = self.STATS_ACCUMULATED_RESULTS_TIMEOUT_SEC
return self.call_unary_with_deadline(
rpc='GetClientAccumulatedStats',
req=_LoadBalancerAccumulatedStatsRequest(),
deadline_sec=timeout_sec,
log_level=logging.INFO)
class XdsUpdateClientConfigureServiceClient(framework.rpc.grpc.GrpcClientHelper
):
stub: test_pb2_grpc.XdsUpdateClientConfigureServiceStub
CONFIGURE_TIMEOUT_SEC: int = 5
def __init__(self, channel: grpc.Channel):
super().__init__(channel,
test_pb2_grpc.XdsUpdateClientConfigureServiceStub)
def configure(
self,
*,
rpc_types: Iterable[str],
metadata: Optional[Iterable[Tuple[str, str, str]]] = None,
app_timeout: Optional[int] = None,
timeout_sec: int = CONFIGURE_TIMEOUT_SEC,
) -> None:
request = messages_pb2.ClientConfigureRequest()
for rpc_type in rpc_types:
request.types.append(
messages_pb2.ClientConfigureRequest.RpcType.Value(rpc_type))
if metadata:
for entry in metadata:
request.metadata.append(
messages_pb2.ClientConfigureRequest.Metadata(
type=messages_pb2.ClientConfigureRequest.RpcType.Value(
entry[0]),
key=entry[1],
value=entry[2],
))
if app_timeout:
request.timeout_sec = app_timeout
# Configure's response is empty
self.call_unary_with_deadline(rpc='Configure',
req=request,
deadline_sec=timeout_sec,
log_level=logging.INFO)

@ -55,7 +55,7 @@ class KubernetesBaseRunner:
def cleanup(self, *, force=False):
if (self.namespace and not self.reuse_namespace) or force:
self._delete_namespace()
self.delete_namespace()
self.namespace = None
@staticmethod
@ -249,7 +249,7 @@ class KubernetesBaseRunner:
self.k8s_namespace.wait_for_service_account_deleted(name)
logger.debug('Service account %s deleted', name)
def _delete_namespace(self, wait_for_deletion=True):
def delete_namespace(self, wait_for_deletion=True):
logger.info('Deleting namespace %s', self.k8s_namespace.name)
try:
self.k8s_namespace.delete()

@ -20,7 +20,7 @@ modules.
import datetime
import functools
import logging
from typing import Iterator, Optional
from typing import Iterable, Optional, Tuple
from framework.helpers import retryers
from framework.infrastructure import gcp
@ -36,6 +36,7 @@ logger = logging.getLogger(__name__)
# Type aliases
_timedelta = datetime.timedelta
_LoadBalancerStatsServiceClient = grpc_testing.LoadBalancerStatsServiceClient
_XdsUpdateClientConfigureServiceClient = grpc_testing.XdsUpdateClientConfigureServiceClient
_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
_ChannelzChannel = grpc_channelz.Channel
_ChannelzChannelState = grpc_channelz.ChannelState
@ -69,6 +70,12 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
return _LoadBalancerStatsServiceClient(self._make_channel(
self.rpc_port))
@property
@functools.lru_cache(None)
def update_config(self):
return _XdsUpdateClientConfigureServiceClient(
self._make_channel(self.rpc_port))
@property
@functools.lru_cache(None)
def channelz(self) -> _ChannelzServiceClient:
@ -91,6 +98,15 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
return self.load_balancer_stats.get_client_stats(
num_rpcs=num_rpcs, timeout_sec=timeout_sec)
def get_load_balancer_accumulated_stats(
self,
*,
timeout_sec: Optional[int] = None,
) -> grpc_testing.LoadBalancerAccumulatedStatsResponse:
"""Shortcut to LoadBalancerStatsServiceClient.get_client_accumulated_stats()"""
return self.load_balancer_stats.get_client_accumulated_stats(
timeout_sec=timeout_sec)
def wait_for_active_server_channel(self) -> _ChannelzChannel:
"""Wait for the channel to the server to transition to READY.
@ -181,7 +197,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
f'Client has no {_ChannelzChannelState.Name(state)} channel with '
'the server')
def get_server_channels(self, **kwargs) -> Iterator[_ChannelzChannel]:
def get_server_channels(self, **kwargs) -> Iterable[_ChannelzChannel]:
return self.channelz.find_channels_for_target(self.server_target,
**kwargs)

@ -0,0 +1,271 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A test framework built for urlMap related xDS test cases."""
import inspect
import functools
from typing import Any, Iterable, List, Mapping, Tuple
from absl import flags
from absl import logging
from framework import xds_flags
from framework import xds_k8s_flags
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.infrastructure import traffic_director
from framework.test_app import client_app
from framework.test_app import server_app
flags.adopt_module_key_flags(xds_flags)
flags.adopt_module_key_flags(xds_k8s_flags)
STRATEGY = flags.DEFINE_enum('strategy',
default='reuse',
enum_values=['create', 'keep', 'reuse'],
help='Strategy of GCP resources management')
# Type alias
UrlMapType = Any
HostRule = Any
PathMatcher = Any
_COMPUTE_V1_URL_PREFIX = 'https://www.googleapis.com/compute/v1'
class _UrlMapChangeAggregator:
"""Where all the urlMap change happens."""
def __init__(self, url_map_name: str):
self._map = {
"name": url_map_name,
"defaultService": GcpResourceManager().default_backend_service(),
"hostRules": [],
"pathMatchers": [],
}
def get_map(self) -> UrlMapType:
return self._map
def apply_change(self, test_case: 'XdsUrlMapTestCase') -> None:
logging.info('Apply urlMap change for test case: %s.%s',
test_case.short_module_name, test_case.__name__)
url_map_parts = test_case.url_map_change(
*self._get_test_case_url_map(test_case))
self._set_test_case_url_map(*url_map_parts)
def _get_test_case_url_map(
self,
test_case: 'XdsUrlMapTestCase') -> Tuple[HostRule, PathMatcher]:
host_rule = {
"hosts": [test_case.hostname()],
"pathMatcher": test_case.path_matcher_name(),
}
path_matcher = {
"name": test_case.path_matcher_name(),
"defaultService": GcpResourceManager().default_backend_service(),
}
return host_rule, path_matcher
def _set_test_case_url_map(self, host_rule: HostRule,
path_matcher: PathMatcher) -> None:
self._map["hostRules"].append(host_rule)
self._map["pathMatchers"].append(path_matcher)
def _package_flags() -> Mapping[str, Any]:
"""Automatically parse Abseil flags into a dictionary.
Abseil flag is only available after the Abseil app initialization. If we use
__new__ in our metaclass, the flag value parse will happen during the
initialization of modules, hence will fail. That's why we are using __call__
to inject metaclass magics, and the flag parsing will be delayed until the
class is about to be instantiated.
"""
res = {}
for flag_module in [xds_flags, xds_k8s_flags]:
for key, value in inspect.getmembers(flag_module):
if isinstance(value, flags.FlagHolder):
res[key.lower()] = value.value
res['strategy'] = STRATEGY.value
return res
class _MetaSingletonAndAbslFlags(type):
"""Ensures singleton and injects flag values."""
# Allow different subclasses to create different singletons.
_instances = {}
# But we only parse Abseil flags once.
_flags = None
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
if cls._flags is None:
cls._flags = _package_flags()
obj = super().__call__(cls._flags, *args, **kwargs)
cls._instances[cls] = obj
return obj
return cls._instances[cls]
class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
"""Manages the lifecycle of GCP resources.
The GCP resources including:
- 3 K8s deployment (client, default backends, alternative backends)
- Full set of the Traffic Director stuff
- Merged gigantic urlMap from all imported test cases
All resources are intended to be used across test cases and multiple runs
(except the client K8s deployment).
"""
def __init__(self, absl_flags: Mapping[str, Any]):
for key in absl_flags:
setattr(self, key, absl_flags[key])
# API managers
self.k8s_api_manager = k8s.KubernetesApiManager(self.kube_context)
self.gcp_api_manager = gcp.api.GcpApiManager()
self.td = traffic_director.TrafficDirectorManager(
self.gcp_api_manager,
self.project,
resource_prefix=self.namespace,
network=self.network,
)
# Kubernetes namespace
self.k8s_namespace = k8s.KubernetesNamespace(self.k8s_api_manager,
self.namespace)
# Kubernetes Test Client
self.test_client_runner = client_app.KubernetesClientRunner(
self.k8s_namespace,
deployment_name=self.client_name,
image_name=self.client_image,
gcp_project=self.project,
gcp_api_manager=self.gcp_api_manager,
gcp_service_account=self.gcp_service_account,
td_bootstrap_image=self.td_bootstrap_image,
network=self.network,
debug_use_port_forwarding=self.debug_use_port_forwarding,
stats_port=self.client_port,
reuse_namespace=True)
# Kubernetes Test Servers
self.test_server_runner = server_app.KubernetesServerRunner(
self.k8s_namespace,
deployment_name=self.server_name,
image_name=self.server_image,
gcp_project=self.project,
gcp_api_manager=self.gcp_api_manager,
gcp_service_account=self.gcp_service_account,
td_bootstrap_image=self.td_bootstrap_image,
network=self.network)
self.test_server_alternative_runner = server_app.KubernetesServerRunner(
self.k8s_namespace,
deployment_name=self.server_name + '-alternative',
image_name=self.server_image,
gcp_project=self.project,
gcp_api_manager=self.gcp_api_manager,
gcp_service_account=self.gcp_service_account,
td_bootstrap_image=self.td_bootstrap_image,
network=self.network,
reuse_namespace=True)
logging.info('Strategy of GCP resources management: %s', self.strategy)
def _pre_cleanup(self):
# Cleanup existing debris
logging.info('GcpResourceManager: pre clean-up')
self.td.cleanup(force=True)
self.test_client_runner.delete_namespace()
def setup(self, test_case_classes: 'Iterable[XdsUrlMapTestCase]') -> None:
if self.strategy not in ['create', 'keep']:
logging.info('GcpResourceManager: skipping setup for strategy [%s]',
self.strategy)
return
# Construct UrlMap from test classes
# This is the step that mostly likely to go wrong. Lifting it to be the
# first task ensures fail fast.
aggregator = _UrlMapChangeAggregator(
url_map_name="%s-%s" % (self.namespace, self.td.URL_MAP_NAME))
for test_case_class in test_case_classes:
aggregator.apply_change(test_case_class)
final_url_map = aggregator.get_map()
# Clean up debris from previous runs
self._pre_cleanup()
# Start creating GCP resources
logging.info('GcpResourceManager: start setup')
# Firewall
if self.ensure_firewall:
self.td.create_firewall_rule(
allowed_ports=self.firewall_allowed_ports)
# Health Checks
self.td.create_health_check()
# Backend Services
self.td.create_backend_service()
self.td.create_alternative_backend_service()
# UrlMap
self.td.create_url_map_with_content(final_url_map)
# Target Proxy
self.td.create_target_proxy()
# Forwarding Rule
self.td.create_forwarding_rule(self.server_xds_port)
# Kubernetes Test Server
self.test_server_runner.run(
test_port=self.server_port,
maintenance_port=self.server_maintenance_port)
# Kubernetes Test Server Alternative
self.test_server_alternative_runner.run(
test_port=self.server_port,
maintenance_port=self.server_maintenance_port)
# Add backend to default backend service
neg_name, neg_zones = self.k8s_namespace.get_service_neg(
self.test_server_runner.service_name, self.server_port)
self.td.backend_service_add_neg_backends(neg_name, neg_zones)
# Add backend to alternative backend service
neg_name, neg_zones = self.k8s_namespace.get_service_neg(
self.test_server_alternative_runner.service_name, self.server_port)
self.td.alternative_backend_service_add_neg_backends(
neg_name, neg_zones)
# Wait for healthy backends
self.td.wait_for_backends_healthy_status()
self.td.wait_for_alternative_backends_healthy_status()
def cleanup(self) -> None:
if self.strategy not in ['create']:
logging.info(
'GcpResourceManager: skipping tear down for strategy [%s]',
self.strategy)
return
logging.info('GcpResourceManager: start tear down')
if hasattr(self, 'td'):
self.td.cleanup(force=True)
if hasattr(self, 'test_client_runner'):
self.test_client_runner.cleanup(force=True)
if hasattr(self, 'test_server_runner'):
self.test_server_runner.cleanup(force=True)
if hasattr(self, 'test_server_alternative_runner'):
self.test_server_alternative_runner.cleanup(force=True,
force_namespace=True)
@functools.lru_cache(None)
def default_backend_service(self) -> str:
"""Returns default backend service URL."""
self.td.load_backend_service()
return self.td.backend_service.url
@functools.lru_cache(None)
def alternative_backend_service(self) -> str:
"""Returns alternative backend service URL."""
self.td.load_alternative_backend_service()
return self.td.alternative_backend_service.url

@ -0,0 +1,415 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A test framework built for urlMap related xDS test cases."""
import abc
from dataclasses import dataclass
import datetime
import json
import os
import unittest
import sys
import time
from typing import Any, Iterable, Mapping, Optional, Tuple, Union
from absl import flags
from absl import logging
from absl.testing import absltest
from google.protobuf import json_format
import grpc
from framework import xds_k8s_testcase
from framework import xds_url_map_test_resources
from framework.helpers import retryers
from framework.rpc import grpc_testing
from framework.test_app import client_app
# Load existing flags
flags.adopt_module_key_flags(xds_k8s_testcase)
flags.adopt_module_key_flags(xds_url_map_test_resources)
# Define urlMap specific flags
QPS = flags.DEFINE_integer('qps', default=25, help='The QPS client is sending')
# Test configs
_URL_MAP_PROPAGATE_TIMEOUT_SEC = 600
_URL_MAP_PROPAGATE_CHECK_INTERVAL_SEC = 2
URL_MAP_TESTCASE_FILE_SUFFIX = '_test.py'
_CLIENT_CONFIGURE_WAIT_SEC = 2
# Type aliases
XdsTestClient = client_app.XdsTestClient
GcpResourceManager = xds_url_map_test_resources.GcpResourceManager
HostRule = xds_url_map_test_resources.HostRule
PathMatcher = xds_url_map_test_resources.PathMatcher
JsonType = Any
# ProtoBuf translatable RpcType enums
RpcTypeUnaryCall = 'UNARY_CALL'
RpcTypeEmptyCall = 'EMPTY_CALL'
def _split_camel(s: str, delimiter: str = '-') -> str:
"""Turn camel case name to snake-case-like name."""
return ''.join(delimiter + c.lower() if c.isupper() else c
for c in s).lstrip(delimiter)
class DumpedXdsConfig(dict):
"""A convenience class to check xDS config.
Feel free to add more pre-compute fields.
"""
def __init__(self, xds_json: JsonType):
super().__init__(xds_json)
self.json_config = xds_json
self.lds = None
self.rds = None
self.cds = []
self.eds = []
self.endpoints = []
for xds_config in self['xdsConfig']:
try:
if 'listenerConfig' in xds_config:
self.lds = xds_config['listenerConfig']['dynamicListeners'][
0]['activeState']['listener']
elif 'routeConfig' in xds_config:
self.rds = xds_config['routeConfig']['dynamicRouteConfigs'][
0]['routeConfig']
elif 'clusterConfig' in xds_config:
for cluster in xds_config['clusterConfig'][
'dynamicActiveClusters']:
self.cds.append(cluster['cluster'])
elif 'endpointConfig' in xds_config:
for endpoint in xds_config['endpointConfig'][
'dynamicEndpointConfigs']:
self.eds.append(endpoint['endpointConfig'])
except Exception as e:
logging.debug('Parse dumped xDS config failed with %s: %s',
type(e), e)
for endpoint_config in self.eds:
for endpoint in endpoint_config.get('endpoints', {}):
for lb_endpoint in endpoint.get('lbEndpoints', {}):
try:
if lb_endpoint['healthStatus'] == 'HEALTHY':
self.endpoints.append(
'%s:%s' % (lb_endpoint['endpoint']['address']
['socketAddress']['address'],
lb_endpoint['endpoint']['address']
['socketAddress']['portValue']))
except Exception as e:
logging.debug('Parse endpoint failed with %s: %s',
type(e), e)
def __str__(self) -> str:
return json.dumps(self, indent=2)
class RpcDistributionStats:
"""A convenience class to check RPC distribution.
Feel free to add more pre-compute fields.
"""
num_failures: int
num_oks: int
default_service_rpc_count: int
alternative_service_rpc_count: int
unary_call_default_service_rpc_count: int
empty_call_default_service_rpc_count: int
unary_call_alternative_service_rpc_count: int
empty_call_alternative_service_rpc_count: int
def __init__(self, json_lb_stats: JsonType):
self.num_failures = json_lb_stats.get('numFailures', 0)
self.num_oks = 0
self.default_service_rpc_count = 0
self.alternative_service_rpc_count = 0
self.unary_call_default_service_rpc_count = 0
self.empty_call_default_service_rpc_count = 0
self.unary_call_alternative_service_rpc_count = 0
self.empty_call_alternative_service_rpc_count = 0
if 'rpcsByMethod' in json_lb_stats:
for rpc_type in json_lb_stats['rpcsByMethod']:
for peer in json_lb_stats['rpcsByMethod'][rpc_type][
'rpcsByPeer']:
count = json_lb_stats['rpcsByMethod'][rpc_type][
'rpcsByPeer'][peer]
self.num_oks += count
if rpc_type == 'UnaryCall':
if 'alternative' in peer:
self.unary_call_alternative_service_rpc_count = count
self.alternative_service_rpc_count += count
else:
self.unary_call_default_service_rpc_count = count
self.default_service_rpc_count += count
else:
if 'alternative' in peer:
self.empty_call_alternative_service_rpc_count = count
self.alternative_service_rpc_count += count
else:
self.empty_call_default_service_rpc_count = count
self.default_service_rpc_count += count
@dataclass
class ExpectedResult:
"""Describes the expected result of assertRpcStatusCode method below."""
rpc_type: str = RpcTypeUnaryCall
status_code: grpc.StatusCode = grpc.StatusCode.OK
ratio: float = 1
class _MetaXdsUrlMapTestCase(type):
"""Tracking test case subclasses."""
# Automatic discover of all subclasses
_test_case_classes = []
_test_case_names = set()
# Keep track of started and finished test cases, so we know when to setup
# and tear down GCP resources.
_started_test_cases = set()
_finished_test_cases = set()
def __new__(cls, name: str, bases: Iterable[Any],
attrs: Mapping[str, Any]) -> Any:
# Hand over the tracking objects
attrs['test_case_classes'] = cls._test_case_classes
attrs['test_case_names'] = cls._test_case_names
attrs['started_test_cases'] = cls._started_test_cases
attrs['finished_test_cases'] = cls._finished_test_cases
# Handle the test name reflection
module_name = os.path.split(
sys.modules[attrs['__module__']].__file__)[-1]
if module_name.endswith(URL_MAP_TESTCASE_FILE_SUFFIX):
module_name = module_name.replace(URL_MAP_TESTCASE_FILE_SUFFIX, '')
attrs['short_module_name'] = module_name.replace('_', '-')
# Create the class and track
new_class = type.__new__(cls, name, bases, attrs)
if name.startswith('Test'):
cls._test_case_names.add(name)
cls._test_case_classes.append(new_class)
else:
logging.debug('Skipping test case class: %s', name)
return new_class
class XdsUrlMapTestCase(absltest.TestCase, metaclass=_MetaXdsUrlMapTestCase):
"""XdsUrlMapTestCase is the base class for urlMap related tests.
The subclass is expected to implement 3 methods:
- url_map_change: Updates the urlMap components for this test case
- xds_config_validate: Validates if the client received legit xDS configs
- rpc_distribution_validate: Validates if the routing behavior is correct
"""
@staticmethod
@abc.abstractmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
"""Updates the dedicated urlMap components for this test case.
Each test case will have a dedicated HostRule, where the hostname is
generated from the test case name. The HostRule will be linked to a
PathMatcher, where stores the routing logic.
Args:
host_rule: A HostRule GCP resource as a JSON dict.
path_matcher: A PathMatcher GCP resource as a JSON dict.
Returns:
A tuple contains the updated version of given HostRule and
PathMatcher.
"""
pass
@abc.abstractmethod
def xds_config_validate(self, xds_config: DumpedXdsConfig) -> None:
"""Validates received xDS config, if anything is wrong, raise.
This stage only ends when the control plane failed to send a valid
config within a given time range, like 600s.
Args:
xds_config: A DumpedXdsConfig instance can be used as a JSON dict,
but also provides helper fields for commonly checked xDS config.
"""
pass
@abc.abstractmethod
def rpc_distribution_validate(self, client: XdsTestClient) -> None:
"""Validates the routing behavior, if any is wrong, raise.
Args:
client: A XdsTestClient instance for all sorts of end2end testing.
"""
pass
@classmethod
def hostname(cls):
return "%s.%s:%s" % (cls.short_module_name, _split_camel(
cls.__name__), GcpResourceManager().server_xds_port)
@classmethod
def path_matcher_name(cls):
# Path matcher name must match r'(?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)'
return "%s-%s-pm" % (cls.short_module_name, _split_camel(cls.__name__))
@classmethod
def setUpClass(cls):
if not cls.started_test_cases:
# Create the GCP resource once before the first test start
GcpResourceManager().setup(cls.test_case_classes)
cls.started_test_cases.add(cls.__name__)
# TODO(lidiz) concurrency is possible, pending multiple-instance change
GcpResourceManager().test_client_runner.cleanup(force=True)
# Sending both RPCs when starting.
cls.test_client = GcpResourceManager().test_client_runner.run(
server_target=f'xds:///{cls.hostname()}',
rpc='UnaryCall,EmptyCall',
qps=QPS.value,
print_response=True)
@classmethod
def tearDownClass(cls):
GcpResourceManager().test_client_runner.cleanup(force=True)
cls.finished_test_cases.add(cls.__name__)
if cls.finished_test_cases == cls.test_case_names:
# Tear down the GCP resource after all tests finished
GcpResourceManager().cleanup()
def _fetch_and_check_xds_config(self):
# Cleanup state for this attempt
self._xds_json_config = None
# Fetch client config
config = self.test_client.csds.fetch_client_status(
log_level=logging.INFO)
self.assertIsNotNone(config)
# Found client config, test it.
self._xds_json_config = json_format.MessageToDict(config)
try:
self.xds_config_validate(DumpedXdsConfig(self._xds_json_config))
except Exception as e:
# Log the exception for debugging purposes.
if type(self._last_xds_config_exception) != type(e) or str(
self._last_xds_config_exception) != str(e):
# Suppress repetitive exception logs
logging.exception(e)
self._last_xds_config_exception = e
raise
return
def run(self, result: unittest.TestResult = None) -> None:
"""Abort this test case if CSDS check is failed.
This prevents the test runner to waste time on RPC distribution test,
and yields clearer signal.
"""
if result.failures or result.errors:
logging.info('Aborting %s', self.__class__.__name__)
else:
super().run(result)
def test_client_config(self):
self._last_xds_config_exception = None
retryer = retryers.constant_retryer(
wait_fixed=datetime.timedelta(
seconds=_URL_MAP_PROPAGATE_CHECK_INTERVAL_SEC),
timeout=datetime.timedelta(seconds=_URL_MAP_PROPAGATE_TIMEOUT_SEC),
logger=logging,
log_level=logging.INFO)
try:
retryer(self._fetch_and_check_xds_config)
finally:
logging.info(
'latest xDS config:\n%s',
GcpResourceManager().td.compute.resource_pretty_format(
self._xds_json_config))
def test_rpc_distribution(self):
self.rpc_distribution_validate(self.test_client)
@staticmethod
def configure_and_send(test_client: XdsTestClient,
*,
rpc_types: Iterable[str],
metadata: Optional[Iterable[Tuple[str, str,
str]]] = None,
app_timeout: Optional[int] = None,
num_rpcs: int) -> RpcDistributionStats:
test_client.update_config.configure(rpc_types=rpc_types,
metadata=metadata,
app_timeout=app_timeout)
# Configure RPC might race with get stats RPC on slower machines.
time.sleep(_CLIENT_CONFIGURE_WAIT_SEC)
json_lb_stats = json_format.MessageToDict(
test_client.get_load_balancer_stats(num_rpcs=num_rpcs))
logging.info(
'Received LoadBalancerStatsResponse from test client %s:\n%s',
test_client.ip, json.dumps(json_lb_stats, indent=2))
return RpcDistributionStats(json_lb_stats)
def assertNumEndpoints(self, xds_config: DumpedXdsConfig, k: int) -> None:
self.assertLen(
xds_config.endpoints, k,
f'insufficient endpoints in EDS: want={k} seen={xds_config.endpoints}'
)
def assertRpcStatusCode(self, test_client: XdsTestClient, *,
expected: Iterable[ExpectedResult], length: int,
tolerance: float) -> None:
"""Assert the distribution of RPC statuses over a period of time."""
# Sending with pre-set QPS for a period of time
before_stats = test_client.get_load_balancer_accumulated_stats()
logging.info(
'Received LoadBalancerAccumulatedStatsResponse from test client %s: before:\n%s',
test_client.ip, before_stats)
time.sleep(length)
after_stats = test_client.get_load_balancer_accumulated_stats()
logging.info(
'Received LoadBalancerAccumulatedStatsResponse from test client %s: after: \n%s',
test_client.ip, after_stats)
# Validate the diff
for expected_result in expected:
rpc = expected_result.rpc_type
status = expected_result.status_code.value[0]
# Compute observation
seen_after = after_stats.stats_per_method.get(rpc, {}).result.get(
status, 0)
seen_before = before_stats.stats_per_method.get(rpc, {}).result.get(
status, 0)
seen = seen_after - seen_before
# Compute total number of RPC started
stats_per_method_after = after_stats.stats_per_method.get(
rpc, {}).result.items()
total_after = sum(
x[1] for x in stats_per_method_after) # (status_code, count)
stats_per_method_before = before_stats.stats_per_method.get(
rpc, {}).result.items()
total_before = sum(
x[1] for x in stats_per_method_before) # (status_code, count)
total = total_after - total_before
# Compute and validate the number
want = total * expected_result.ratio
diff_ratio = abs(seen - want) / total
self.assertLessEqual(
diff_ratio, tolerance,
'Expect rpc [%s] to return [%s] at %.2f ratio: seen=%d want=%d total=%d diff_ratio=%.4f > %.2f'
% (rpc, expected_result.status_code, expected_result.ratio,
seen, want, total, diff_ratio, tolerance))

@ -0,0 +1,13 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

@ -0,0 +1,31 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from absl.testing import absltest
from framework import xds_url_map_testcase # Needed for xDS flags
_TEST_CASE_FOLDER = os.path.dirname(__file__)
def load_tests(loader: absltest.TestLoader, unused_tests, unused_pattern):
return loader.discover(_TEST_CASE_FOLDER,
pattern='*' +
xds_url_map_testcase.URL_MAP_TESTCASE_FILE_SUFFIX)
if __name__ == '__main__':
absltest.main()

@ -0,0 +1,70 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Tuple
from absl import flags
from absl.testing import absltest
from framework import xds_url_map_testcase
from framework.test_app import client_app
# Type aliases
HostRule = xds_url_map_testcase.HostRule
PathMatcher = xds_url_map_testcase.PathMatcher
GcpResourceManager = xds_url_map_testcase.GcpResourceManager
DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
XdsTestClient = client_app.XdsTestClient
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_url_map_testcase)
_NUM_RPCS = 50
class TestBasicCsds(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
# Validate Endpoint Configs
self.assertNumEndpoints(xds_config, 1)
# Validate Node
self.assertEqual(self.test_client.ip,
xds_config['node']['metadata']['INSTANCE_IP'])
# Validate Listeners
self.assertIsNotNone(xds_config.lds)
self.assertEqual(self.hostname(), xds_config.lds['name'])
# Validate Route Configs
self.assertTrue(xds_config.rds['virtualHosts'])
# Validate Clusters
self.assertEqual(1, len(xds_config.cds))
self.assertEqual('EDS', xds_config.cds[0]['type'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(
test_client,
rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall],
num_rpcs=_NUM_RPCS)
self.assertEqual(_NUM_RPCS, rpc_distribution.num_oks)
if __name__ == '__main__':
absltest.main()

@ -0,0 +1,362 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Tuple
from absl import flags
from absl.testing import absltest
import grpc
from framework import xds_url_map_testcase
from framework.rpc import grpc_testing
from framework.test_app import client_app
# Type aliases
HostRule = xds_url_map_testcase.HostRule
PathMatcher = xds_url_map_testcase.PathMatcher
GcpResourceManager = xds_url_map_testcase.GcpResourceManager
DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
XdsTestClient = client_app.XdsTestClient
ExpectedResult = xds_url_map_testcase.ExpectedResult
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_url_map_testcase)
# The first batch of RPCs don't count towards the result of test case. They are
# meant to prove the communication between driver and client is fine.
_NUM_RPCS = 10
_LENGTH_OF_RPC_SENDING_SEC = 16
# We are using sleep to synchronize test driver and the client... Even though
# the client is sending at QPS rate, we can't assert that exactly QPS *
# SLEEP_DURATION number of RPC is finished. The final completed RPC might be
# slightly more or less.
_NON_RANDOM_ERROR_TOLERANCE = 0.01
# For random generator involved test cases, we want to be more loose about the
# final result. Otherwise, we will need more test duration (sleep duration) and
# more accurate communication mechanism. The accurate of random number
# generation is not the intention of this test.
_ERROR_TOLERANCE = 0.2
_DELAY_CASE_APPLICATION_TIMEOUT_SEC = 1
_BACKLOG_WAIT_TIME_SEC = 20
def _build_fault_injection_route_rule(abort_percentage: int = 0,
delay_percentage: int = 0):
return {
'priority': 0,
'matchRules': [{
'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
}],
'service': GcpResourceManager().default_backend_service(),
'routeAction': {
'faultInjectionPolicy': {
'abort': {
'httpStatus': 401,
'percentage': abort_percentage,
},
'delay': {
'fixedDelay': {
'seconds': '20'
},
'percentage': delay_percentage,
}
}
},
}
def _wait_until_backlog_cleared(test_client: XdsTestClient,
timeout: int = _BACKLOG_WAIT_TIME_SEC):
""" Wait until the completed RPC is close to started RPC.
For delay injected test cases, there might be a backlog of RPCs due to slow
initialization of the client. E.g., if initialization took 20s and qps is
25, then there will be a backlog of 500 RPCs. In normal test cases, this is
fine, because RPCs will fail immediately. But for delay injected test cases,
the RPC might linger much longer and affect the stability of test results.
"""
logger.info('Waiting for RPC backlog to clear for %d seconds', timeout)
deadline = time.time() + timeout
while time.time() < deadline:
stats = test_client.get_load_balancer_accumulated_stats()
ok = True
for rpc_type in [RpcTypeUnaryCall, RpcTypeEmptyCall]:
started = stats.num_rpcs_started_by_method.get(rpc_type, 0)
completed = stats.num_rpcs_succeeded_by_method.get(
rpc_type, 0) + stats.num_rpcs_failed_by_method.get(rpc_type, 0)
# We consider the backlog is healthy, if the diff between started
# RPCs and completed RPCs is less than 1.5 QPS.
if abs(started - completed) > xds_url_map_testcase.QPS.value * 1.1:
logger.info(
'RPC backlog exist: rpc_type=%s started=%s completed=%s',
rpc_type, started, completed)
time.sleep(_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
ok = False
else:
logger.info(
'RPC backlog clear: rpc_type=%s started=%s completed=%s',
rpc_type, started, completed)
if ok:
# Both backlog of both types of RPCs is clear, success, return.
return
raise RuntimeError('failed to clear RPC backlog in %s seconds', timeout)
class TestZeroPercentFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=0,
delay_percentage=0)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual('20s', filter_config['delay']['fixedDelay'])
self.assertEqual(
0, filter_config['delay']['percentage'].get('numerator', 0))
self.assertEqual('MILLION',
filter_config['delay']['percentage']['denominator'])
self.assertEqual(401, filter_config['abort']['httpStatus'])
self.assertEqual(
0, filter_config['abort']['percentage'].get('numerator', 0))
self.assertEqual('MILLION',
filter_config['abort']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeUnaryCall],
num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.OK,
ratio=1),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_NON_RANDOM_ERROR_TOLERANCE)
class TestNonMatchingFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase):
"""EMPTY_CALL is not fault injected, so it should succeed."""
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=100,
delay_percentage=100)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
# The first route rule for UNARY_CALL is fault injected
self.assertEqual(
"/grpc.testing.TestService/UnaryCall",
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['path'])
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual('20s', filter_config['delay']['fixedDelay'])
self.assertEqual(1000000,
filter_config['delay']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['delay']['percentage']['denominator'])
self.assertEqual(401, filter_config['abort']['httpStatus'])
self.assertEqual(1000000,
filter_config['abort']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['abort']['percentage']['denominator'])
# The second route rule for all other RPCs is untouched
self.assertNotIn(
'envoy.filters.http.fault',
xds_config.rds['virtualHosts'][0]['routes'][1].get(
'typedPerFilterConfig', {}))
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeEmptyCall],
num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeEmptyCall,
status_code=grpc.StatusCode.OK,
ratio=1),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_NON_RANDOM_ERROR_TOLERANCE)
@absltest.skip('20% RPC might pass immediately, reason unknown')
class TestAlwaysDelay(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=0,
delay_percentage=100)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual('20s', filter_config['delay']['fixedDelay'])
self.assertEqual(1000000,
filter_config['delay']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['delay']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(
test_client,
rpc_types=[RpcTypeUnaryCall],
num_rpcs=_NUM_RPCS,
app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
_wait_until_backlog_cleared(test_client)
self.assertRpcStatusCode(
test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.DEADLINE_EXCEEDED,
ratio=1),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_NON_RANDOM_ERROR_TOLERANCE)
class TestAlwaysAbort(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=100,
delay_percentage=0)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual(401, filter_config['abort']['httpStatus'])
self.assertEqual(1000000,
filter_config['abort']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['abort']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeUnaryCall],
num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(
test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.UNAUTHENTICATED,
ratio=1),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_NON_RANDOM_ERROR_TOLERANCE)
class TestDelayHalf(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=0,
delay_percentage=50)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual('20s', filter_config['delay']['fixedDelay'])
self.assertEqual(500000,
filter_config['delay']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['delay']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(
test_client,
rpc_types=[RpcTypeUnaryCall],
num_rpcs=_NUM_RPCS,
app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
_wait_until_backlog_cleared(test_client)
self.assertRpcStatusCode(
test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.DEADLINE_EXCEEDED,
ratio=0.5),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_ERROR_TOLERANCE)
class TestAbortHalf(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=50,
delay_percentage=0)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual(401, filter_config['abort']['httpStatus'])
self.assertEqual(500000,
filter_config['abort']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['abort']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeUnaryCall],
num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(
test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.UNAUTHENTICATED,
ratio=0.5),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_ERROR_TOLERANCE)
if __name__ == '__main__':
absltest.main()

@ -0,0 +1,357 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Tuple
from absl import flags
from absl.testing import absltest
from framework import xds_url_map_testcase
from framework.test_app import client_app
# Type aliases
HostRule = xds_url_map_testcase.HostRule
PathMatcher = xds_url_map_testcase.PathMatcher
GcpResourceManager = xds_url_map_testcase.GcpResourceManager
DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
XdsTestClient = client_app.XdsTestClient
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_url_map_testcase)
_NUM_RPCS = 150
_TEST_METADATA_KEY = 'xds_md'
_TEST_METADATA_VALUE_UNARY = 'unary_yranu'
_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric'
_TEST_METADATA_NUMERIC_VALUE = '159'
_TEST_METADATA = (
(RpcTypeUnaryCall, _TEST_METADATA_KEY, _TEST_METADATA_VALUE_UNARY),
(RpcTypeEmptyCall, _TEST_METADATA_KEY, _TEST_METADATA_VALUE_EMPTY),
(RpcTypeUnaryCall, _TEST_METADATA_NUMERIC_KEY,
_TEST_METADATA_NUMERIC_VALUE),
)
class TestExactMatch(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# Header ExactMatch -> alternate_backend_service.
# EmptyCall is sent with the metadata.
'matchRules': [{
'prefixMatch':
'/',
'headerMatches': [{
'headerName': _TEST_METADATA_KEY,
'exactMatch': _TEST_METADATA_VALUE_EMPTY
}]
}],
'service': GcpResourceManager().alternative_backend_service()
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['name'], _TEST_METADATA_KEY)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['exactMatch'], _TEST_METADATA_VALUE_EMPTY)
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeEmptyCall],
metadata=_TEST_METADATA,
num_rpcs=_NUM_RPCS)
self.assertEqual(
_NUM_RPCS,
rpc_distribution.empty_call_alternative_service_rpc_count)
@absltest.skip('the xDS config is good, but distribution is wrong.')
class TestPrefixMatch(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# Header PrefixMatch -> alternate_backend_service.
# UnaryCall is sent with the metadata.
'matchRules': [{
'prefixMatch':
'/',
'headerMatches': [{
'headerName': _TEST_METADATA_KEY,
'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2]
}]
}],
'service': GcpResourceManager().alternative_backend_service()
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['name'], _TEST_METADATA_KEY)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['prefixMatch'], _TEST_METADATA_VALUE_UNARY[:2])
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeUnaryCall],
metadata=_TEST_METADATA,
num_rpcs=_NUM_RPCS)
self.assertEqual(
_NUM_RPCS,
rpc_distribution.unary_call_alternative_service_rpc_count)
class TestSuffixMatch(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# Header SuffixMatch -> alternate_backend_service.
# EmptyCall is sent with the metadata.
'matchRules': [{
'prefixMatch':
'/',
'headerMatches': [{
'headerName': _TEST_METADATA_KEY,
'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:]
}]
}],
'service': GcpResourceManager().alternative_backend_service()
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['name'], _TEST_METADATA_KEY)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['suffixMatch'], _TEST_METADATA_VALUE_EMPTY[-2:])
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeEmptyCall],
metadata=_TEST_METADATA,
num_rpcs=_NUM_RPCS)
self.assertEqual(
_NUM_RPCS,
rpc_distribution.empty_call_alternative_service_rpc_count)
class TestPresentMatch(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# Header 'xds_md_numeric' present -> alternate_backend_service.
# UnaryCall is sent with the metadata, so will be sent to alternative.
'matchRules': [{
'prefixMatch':
'/',
'headerMatches': [{
'headerName': _TEST_METADATA_NUMERIC_KEY,
'presentMatch': True
}]
}],
'service': GcpResourceManager().alternative_backend_service()
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['name'], _TEST_METADATA_NUMERIC_KEY)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['presentMatch'], True)
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeUnaryCall],
metadata=_TEST_METADATA,
num_rpcs=_NUM_RPCS)
self.assertEqual(
_NUM_RPCS,
rpc_distribution.unary_call_alternative_service_rpc_count)
class TestInvertMatch(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# Header invert ExactMatch -> alternate_backend_service.
# UnaryCall is sent with the metadata, so will be sent to
# default. EmptyCall will be sent to alternative.
'matchRules': [{
'prefixMatch':
'/',
'headerMatches': [{
'headerName': _TEST_METADATA_KEY,
'exactMatch': _TEST_METADATA_VALUE_UNARY,
'invertMatch': True
}]
}],
'service': GcpResourceManager().alternative_backend_service()
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['name'], _TEST_METADATA_KEY)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['invertMatch'], True)
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(
test_client,
rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall],
metadata=_TEST_METADATA,
num_rpcs=_NUM_RPCS)
self.assertEqual(_NUM_RPCS, rpc_distribution.num_oks)
self.assertEqual(
0, rpc_distribution.unary_call_alternative_service_rpc_count)
self.assertEqual(0,
rpc_distribution.empty_call_default_service_rpc_count)
class TestRangeMatch(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# Header 'xds_md_numeric' range [100,200] -> alternate_backend_service.
# UnaryCall is sent with the metadata in range.
'matchRules': [{
'prefixMatch':
'/',
'headerMatches': [{
'headerName': _TEST_METADATA_NUMERIC_KEY,
'rangeMatch': {
'rangeStart': '100',
'rangeEnd': '200'
}
}]
}],
'service': GcpResourceManager().alternative_backend_service()
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['name'], _TEST_METADATA_NUMERIC_KEY)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['rangeMatch']['start'], '100')
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['rangeMatch']['end'], '200')
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(
test_client,
rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall],
metadata=_TEST_METADATA,
num_rpcs=_NUM_RPCS)
self.assertEqual(_NUM_RPCS, rpc_distribution.num_oks)
self.assertEqual(0,
rpc_distribution.unary_call_default_service_rpc_count)
self.assertEqual(
0, rpc_distribution.empty_call_alternative_service_rpc_count)
class TestRegexMatch(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# Header RegexMatch -> alternate_backend_service.
# EmptyCall is sent with the metadata.
'matchRules': [{
'prefixMatch':
'/',
'headerMatches': [{
'headerName':
_TEST_METADATA_KEY,
'regexMatch':
"^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2],
_TEST_METADATA_VALUE_EMPTY[-2:])
}]
}],
'service': GcpResourceManager().alternative_backend_service()
}],
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['name'], _TEST_METADATA_KEY)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['headers']
[0]['safeRegexMatch']['regex'], "^%s.*%s$" %
(_TEST_METADATA_VALUE_EMPTY[:2], _TEST_METADATA_VALUE_EMPTY[-2:]))
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeEmptyCall],
metadata=_TEST_METADATA,
num_rpcs=_NUM_RPCS)
self.assertEqual(
_NUM_RPCS,
rpc_distribution.empty_call_alternative_service_rpc_count)
if __name__ == '__main__':
absltest.main()

@ -0,0 +1,220 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Tuple
from absl import flags
from absl.testing import absltest
from framework import xds_url_map_testcase
from framework.test_app import client_app
# Type aliases
HostRule = xds_url_map_testcase.HostRule
PathMatcher = xds_url_map_testcase.PathMatcher
GcpResourceManager = xds_url_map_testcase.GcpResourceManager
DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
XdsTestClient = client_app.XdsTestClient
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_url_map_testcase)
_NUM_RPCS = 150
class TestFullPathMatchEmptyCall(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# FullPath EmptyCall -> alternate_backend_service.
'matchRules': [{
'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
}],
'service': GcpResourceManager().alternative_backend_service()
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['path'],
"/grpc.testing.TestService/EmptyCall")
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeEmptyCall],
num_rpcs=_NUM_RPCS)
self.assertEqual(
_NUM_RPCS,
rpc_distribution.empty_call_alternative_service_rpc_count)
class TestFullPathMatchUnaryCall(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# FullPath EmptyCall -> alternate_backend_service.
'matchRules': [{
'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
}],
'service': GcpResourceManager().alternative_backend_service()
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['path'],
"/grpc.testing.TestService/UnaryCall")
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeUnaryCall],
num_rpcs=_NUM_RPCS)
self.assertEqual(
_NUM_RPCS,
rpc_distribution.unary_call_alternative_service_rpc_count)
class TestTwoRoutesAndPrefixMatch(xds_url_map_testcase.XdsUrlMapTestCase):
"""This test case is similar to the one above (but with route services
swapped). This test has two routes (full_path and the default) to match
EmptyCall, and both routes set alternative_backend_service as the action.
This forces the client to handle duplicate Clusters in the RDS response."""
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
{
'priority': 0,
# Prefix UnaryCall -> default_backend_service.
'matchRules': [{
'prefixMatch': '/grpc.testing.TestService/Unary'
}],
'service': GcpResourceManager().default_backend_service()
},
{
'priority': 1,
# FullPath EmptyCall -> alternate_backend_service.
'matchRules': [{
'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
}],
'service': GcpResourceManager().alternative_backend_service()
}
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['prefix'],
"/grpc.testing.TestService/Unary")
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][1]['match']['path'],
"/grpc.testing.TestService/EmptyCall")
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(
test_client,
rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall],
num_rpcs=_NUM_RPCS)
self.assertEqual(0, rpc_distribution.num_failures)
self.assertEqual(
0, rpc_distribution.unary_call_alternative_service_rpc_count)
self.assertEqual(0,
rpc_distribution.empty_call_default_service_rpc_count)
class TestRegexMatch(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# Regex UnaryCall -> alternate_backend_service.
'matchRules': [{
'regexMatch':
'^\/.*\/UnaryCall$' # Unary methods with any services.
}],
'service': GcpResourceManager().alternative_backend_service()
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['safeRegex']
['regex'], '^\/.*\/UnaryCall$')
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeUnaryCall],
num_rpcs=_NUM_RPCS)
self.assertEqual(
_NUM_RPCS,
rpc_distribution.unary_call_alternative_service_rpc_count)
class TestCaseInsensitiveMatch(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [{
'priority': 0,
# ignoreCase EmptyCall -> alternate_backend_service.
'matchRules': [{
# Case insensitive matching.
'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl',
'ignoreCase': True,
}],
'service': GcpResourceManager().alternative_backend_service()
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 2)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['path'],
'/gRpC.tEsTinG.tEstseRvice/empTycaLl')
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['match']
['caseSensitive'], False)
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(test_client,
rpc_types=[RpcTypeEmptyCall],
num_rpcs=_NUM_RPCS)
self.assertEqual(
_NUM_RPCS,
rpc_distribution.empty_call_alternative_service_rpc_count)
if __name__ == '__main__':
absltest.main()

@ -0,0 +1,159 @@
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Tuple
import unittest
from absl import flags
from absl.testing import absltest
import grpc
from framework import xds_k8s_flags
from framework import xds_url_map_testcase
from framework.test_app import client_app
# Type aliases
HostRule = xds_url_map_testcase.HostRule
PathMatcher = xds_url_map_testcase.PathMatcher
GcpResourceManager = xds_url_map_testcase.GcpResourceManager
DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
ExpectedResult = xds_url_map_testcase.ExpectedResult
XdsTestClient = client_app.XdsTestClient
XdsUrlMapTestCase = xds_url_map_testcase.XdsUrlMapTestCase
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_url_map_testcase)
# The first batch of RPCs don't count towards the result of test case. They are
# meant to prove the communication between driver and client is fine.
_NUM_RPCS = 25
_LENGTH_OF_RPC_SENDING_SEC = 10
_ERROR_TOLERANCE = 0.1
class _BaseXdsTimeOutTestCase(XdsUrlMapTestCase):
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher['routeRules'] = [{
'priority': 0,
'matchRules': [{
'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
}],
'service': GcpResourceManager().default_backend_service(),
'routeAction': {
'maxStreamDuration': {
'seconds': 3,
},
},
}]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['route']
['maxStreamDuration']['maxStreamDuration'], '3s')
self.assertEqual(
xds_config.rds['virtualHosts'][0]['routes'][0]['route']
['maxStreamDuration']['grpcTimeoutHeaderMax'], '3s')
def rpc_distribution_validate(self, unused_test_client):
raise NotImplementedError()
# TODO(lidiz) either add support for rpc-behavior to other languages, or we
# should always use Java server as backend.
@absltest.skipUnless('java-server' in xds_k8s_flags.SERVER_IMAGE.value,
'Only Java server supports the rpc-behavior metadata.')
class TestTimeoutInRouteRule(_BaseXdsTimeOutTestCase):
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(
test_client,
rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall],
# UnaryCall and EmptyCall both sleep-4.
# UnaryCall timeouts, EmptyCall succeeds.
metadata=(
(RpcTypeUnaryCall, 'rpc-behavior', 'sleep-4'),
(RpcTypeEmptyCall, 'rpc-behavior', 'sleep-4'),
),
num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(
test_client,
expected=(
ExpectedResult(rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.DEADLINE_EXCEEDED),
ExpectedResult(rpc_type=RpcTypeEmptyCall,
status_code=grpc.StatusCode.OK),
),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_ERROR_TOLERANCE)
@absltest.skipUnless('java-server' in xds_k8s_flags.SERVER_IMAGE.value,
'Only Java server supports the rpc-behavior metadata.')
class TestTimeoutInApplication(_BaseXdsTimeOutTestCase):
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(
test_client,
rpc_types=[RpcTypeUnaryCall],
# UnaryCall only with sleep-2; timeout=1s; calls timeout.
metadata=((RpcTypeUnaryCall, 'rpc-behavior', 'sleep-2'),),
app_timeout=1,
num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(
test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.DEADLINE_EXCEEDED),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_ERROR_TOLERANCE)
class TestTimeoutNotExceeded(_BaseXdsTimeOutTestCase):
def rpc_distribution_validate(self, test_client: XdsTestClient):
rpc_distribution = self.configure_and_send(
test_client,
# UnaryCall only with no sleep; calls succeed.
rpc_types=[RpcTypeUnaryCall],
num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.OK),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_ERROR_TOLERANCE)
def load_tests(loader: absltest.TestLoader, unused_tests, unused_pattern):
suite = unittest.TestSuite()
test_cases = [
TestTimeoutInRouteRule, TestTimeoutInApplication, TestTimeoutNotExceeded
]
for test_class in test_cases:
tests = loader.loadTestsFromTestCase(test_class)
suite.addTests(tests)
return suite
if __name__ == '__main__':
absltest.main()
Loading…
Cancel
Save