From 6764d1b441c37df6aea15c589be422d93cada38c Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 20 May 2020 14:12:24 -0700 Subject: [PATCH] xds interop: add traffic splitting --- tools/run_tests/run_xds_tests.py | 156 ++++++++++++++++++++++++++++++- 1 file changed, 153 insertions(+), 3 deletions(-) diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 9b47b28d1ba..38e71d25889 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -52,6 +52,7 @@ _TEST_CASES = [ 'round_robin', 'secondary_locality_gets_no_requests_on_partial_primary_failure', 'secondary_locality_gets_requests_on_primary_failure', + 'traffic_splitting', ] @@ -103,7 +104,7 @@ argp.add_argument('--zone', default='us-central1-a') argp.add_argument('--secondary_zone', default='us-west1-b', help='Zone to use for secondary TD locality tests') -argp.add_argument('--qps', default=10, type=int, help='Client QPS') +argp.add_argument('--qps', default=100, type=int, help='Client QPS') argp.add_argument( '--wait_for_backend_sec', default=1200, @@ -290,7 +291,7 @@ def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, timeout_sec, - num_rpcs=100): + num_rpcs=_NUM_TEST_RPCS): _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, @@ -299,13 +300,48 @@ def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, def wait_until_all_rpcs_go_to_given_backends(backends, timeout_sec, - num_rpcs=100): + num_rpcs=_NUM_TEST_RPCS): _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, allow_failures=False) +def compareDistributions(actual_distribution, expected_distribution, threshold): + """Compare if two distributions are similar. + + Args: + actual_distribution: A list of floats, contains the actual distribution. + expected_distribution: A list of floats, contains the expected distribution. + threshold: Number within [0,100], the threshold percentage by which the + actual distribution can differ from the expected distribution. + + Returns: + The similarity between the distributions as a boolean. Returns true if the + actual distribution lies within the threshold of the expected + distribution, false otherwise. + + Raises: + ValueError: if threshold is not with in [0,100]. + Exception: containing detailed error messages. + """ + if len(expected_distribution) != len(actual_distribution): + raise Exception( + 'Error: expected and actual distributions have different size (%d vs %d)' + % (len(expected_distribution), len(actual_distribution))) + if threshold < 0 or threshold > 100: + raise ValueError('Value error: Threshold should be between 0 to 100') + threshold_fraction = threshold / 100.0 + for expected, actual in zip(expected_distribution, actual_distribution): + if actual < (expected * (1 - threshold_fraction)): + raise Exception("actual(%f) < expected(%f-%d%%)" % + (actual, expected, threshold)) + if actual > (expected * (1 + threshold_fraction)): + raise Exception("actual(%f) > expected(%f+%d%%)" % + (actual, expected, threshold)) + return True + + def test_backends_restart(gcp, backend_service, instance_group): logger.info('Running test_backends_restart') instance_names = get_instance_names(gcp, instance_group) @@ -499,6 +535,87 @@ def test_secondary_locality_gets_requests_on_primary_failure( patch_backend_instances(gcp, backend_service, [primary_instance_group]) +def test_traffic_splitting(gcp, original_backend_service, instance_group, + alternate_backend_service, same_zone_instance_group): + logger.info('Running test_traffic_splitting') + + logger.info('waiting for original to become healthy') + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + + patch_backend_instances(gcp, alternate_backend_service, + [same_zone_instance_group]) + logger.info('waiting for alternate to become healthy') + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + + original_backend_instances = get_instance_names(gcp, instance_group) + logger.info('original backends instances: %s', original_backend_instances) + + alternate_backend_instances = get_instance_names(gcp, + same_zone_instance_group) + logger.info('alternate backends instances: %s', alternate_backend_instances) + + # Start with all traffic going to original_backend_service. + logger.info('waiting for traffic to all go to original') + wait_until_all_rpcs_go_to_given_backends(original_backend_instances, + _WAIT_FOR_STATS_SEC) + + try: + # Path urlmap, change route action to traffic splitting between original + # and alternate. + logger.info('patching url map with traffic splitting') + expected_service_percentage = [20, 80] + patch_url_map_weighted_backend_services( + gcp, { + original_backend_service: expected_service_percentage[0], + alternate_backend_service: expected_service_percentage[1], + }) + expected_instance_percentage = [ + expected_service_percentage[0] * 1.0 / + len(original_backend_instances) + ] * len(original_backend_instances) + [ + expected_service_percentage[1] * 1.0 / + len(alternate_backend_instances) + ] * len(alternate_backend_instances) + + # Wait for traffic to go to both services. + logger.info( + 'waiting for traffic to go to all backends (including alternate)') + wait_until_all_rpcs_go_to_given_backends( + original_backend_instances + alternate_backend_instances, + _WAIT_FOR_STATS_SEC) + # Verify that weights between two services is expected. + retry_count = 3 + for i in range(retry_count): + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + got_instance_count = [ + stats.rpcs_by_peer[i] for i in original_backend_instances + ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances] + total_count = sum(got_instance_count) + got_instance_percentage = [ + x * 100.0 / total_count for x in got_instance_count + ] + + try: + compareDistributions(got_instance_percentage, + expected_instance_percentage, 5) + except Exception as e: + logger.warning('attempt %d', i) + logger.warning('got percentage: %s', got_instance_percentage) + logger.warning('expected percentage: %s', + expected_instance_percentage) + logger.warning(e) + if i == retry_count - 1: + raise Exception( + 'RPC distribution (%s) differs from expected (%s)', + got_instance_percentage, expected_instance_percentage) + else: + logger.info("success") + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_instances(gcp, alternate_backend_service, []) + + def get_startup_script(path_to_server_binary, service_port): if path_to_server_binary: return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary, @@ -947,12 +1064,41 @@ def resize_instance_group(gcp, def patch_url_map_backend_service(gcp, backend_service): + '''change url_map's backend service''' config = { 'defaultService': backend_service.url, 'pathMatchers': [{ 'name': _PATH_MATCHER_NAME, 'defaultService': backend_service.url, + 'defaultRouteAction': None, + }] + } + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.urlMaps().patch( + project=gcp.project, urlMap=gcp.url_map.name, + body=config).execute(num_retries=_GCP_API_RETRIES) + wait_for_global_operation(gcp, result['name']) + + +def patch_url_map_weighted_backend_services(gcp, servicesWithWeights): + ''' + change url_map's only path matcher's default route action + to traffic splitting. serviceWithWeights is a map from service + to weights. + ''' + weightedBackendServices = [{ + 'backendService': service.url, + 'weight': w, + } for service, w in servicesWithWeights.items()] + logger.debug('patching route action to %s', weightedBackendServices) + config = { + 'pathMatchers': [{ + 'name': _PATH_MATCHER_NAME, + 'defaultService': None, + 'defaultRouteAction': { + 'weightedBackendServices': weightedBackendServices + } }] } logger.debug('Sending GCP request with body=%s', config) @@ -1245,6 +1391,10 @@ try: test_secondary_locality_gets_requests_on_primary_failure( gcp, backend_service, instance_group, secondary_zone_instance_group) + elif test_case == 'traffic_splitting': + test_traffic_splitting(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) else: logger.error('Unknown test case: %s', test_case) sys.exit(1)