From 9bc421c6cf0d652ee44b5e8365e40cf2a75ce420 Mon Sep 17 00:00:00 2001
From: Lidi Zheng <lidiz@google.com>
Date: Thu, 6 May 2021 10:03:54 -0700
Subject: [PATCH] [xDS interop] Updating the config update timeout to 600s
 (#26090)

* Updating the config update timeout to 600s

* Remove unnecessary lines and comments
---
 tools/run_tests/run_xds_tests.py | 321 +++++++++++++++++--------------
 1 file changed, 173 insertions(+), 148 deletions(-)

diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py
index a043c65e117..af10df63fcf 100755
--- a/tools/run_tests/run_xds_tests.py
+++ b/tools/run_tests/run_xds_tests.py
@@ -279,14 +279,23 @@ CLIENT_HOSTS = []
 if args.client_hosts:
     CLIENT_HOSTS = args.client_hosts.split(',')
 
+# Each of the config propagation in the control plane should finish within 600s.
+# Otherwise, it indicates a bug in the control plane. The config propagation
+# includes all kinds of traffic config update, like updating urlMap, creating
+# the resources for the first time, updating BackendService, and changing the
+# status of endpoints in BackendService.
+_WAIT_FOR_URL_MAP_PATCH_SEC = 600
+# In general, fetching load balancing stats only takes ~10s. However, slow
+# config update could lead to empty EDS or similar symptoms causing the
+# connection to hang for a long period of time. So, we want to extend the stats
+# wait time to be the same as urlMap patch time.
+_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC
+
 _DEFAULT_SERVICE_PORT = 80
 _WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
 _WAIT_FOR_OPERATION_SEC = 1200
 _INSTANCE_GROUP_SIZE = args.instance_group_size
 _NUM_TEST_RPCS = 10 * args.qps
-_WAIT_FOR_STATS_SEC = 360
-_WAIT_FOR_VALID_CONFIG_SEC = 60
-_WAIT_FOR_URL_MAP_PATCH_SEC = 300
 _CONNECTION_TIMEOUT_SEC = 60
 _GCP_API_RETRIES = 5
 _BOOTSTRAP_TEMPLATE = """
@@ -827,8 +836,11 @@ def test_round_robin(gcp, backend_service, instance_group):
     # creating new backend resources for each individual test case.
     # Each attempt takes 10 seconds. Config propagation can take several
     # minutes.
-    max_attempts = 40
-    for i in range(max_attempts):
+    deadline = time.time() + _WAIT_FOR_URL_MAP_PATCH_SEC
+    logger.info(
+        'Attempting for %d seconds until received the expected distribution',
+        _WAIT_FOR_URL_MAP_PATCH_SEC)
+    while time.time() < deadline:
         stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
         requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
         total_requests_received = sum(requests_received)
@@ -843,7 +855,8 @@ def test_round_robin(gcp, backend_service, instance_group):
                     'RPC peer distribution differs from expected by more than %d '
                     'for instance %s (%s)' % (threshold, instance, stats))
         return
-    raise Exception('RPC failures persisted through %d retries' % max_attempts)
+    raise Exception('RPC failures persisted through after %s seconds' %
+                    _WAIT_FOR_URL_MAP_PATCH_SEC)
 
 
 def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
@@ -1307,10 +1320,11 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group,
             _WAIT_FOR_STATS_SEC)
 
         # Verify that weights between two services are expected.
-        retry_count = 10
-        # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
-        # seconds timeout.
-        for i in range(retry_count):
+        deadline = time.time() + _WAIT_FOR_URL_MAP_PATCH_SEC
+        logger.info(
+            'Attempting for %d seconds until received the expected distribution',
+            _WAIT_FOR_URL_MAP_PATCH_SEC)
+        while time.time() < deadline:
             stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
             got_instance_count = [
                 stats.rpcs_by_peer[i] for i in original_backend_instances
@@ -1324,18 +1338,15 @@ def test_traffic_splitting(gcp, original_backend_service, instance_group,
                 compare_distributions(got_instance_percentage,
                                       expected_instance_percentage, 5)
             except Exception as e:
-                logger.info('attempt %d', i)
-                logger.info('got percentage: %s', got_instance_percentage)
-                logger.info('expected percentage: %s',
+                logger.info('Got percentage: %s, expected percentage: %s',
+                            got_instance_percentage,
                             expected_instance_percentage)
                 logger.info(e)
-                if i == retry_count - 1:
-                    raise Exception(
-                        'RPC distribution (%s) differs from expected (%s)' %
-                        (got_instance_percentage, expected_instance_percentage))
             else:
                 logger.info("success")
-                break
+                return
+        raise Exception('RPC distribution (%s) differs from expected (%s)' %
+                        (got_instance_percentage, expected_instance_percentage))
     finally:
         patch_url_map_backend_service(gcp, original_backend_service)
         patch_backend_service(gcp, alternate_backend_service, [])
@@ -1459,23 +1470,22 @@ def test_path_matching(gcp, original_backend_service, instance_group,
                 original_backend_instances + alternate_backend_instances,
                 _WAIT_FOR_STATS_SEC)
 
-            retry_count = 80
-            # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
-            # seconds timeout.
-            for i in range(retry_count):
+            deadline = time.time() + _WAIT_FOR_URL_MAP_PATCH_SEC
+            logger.info(
+                'Attempting for %d seconds until received the expected distribution',
+                _WAIT_FOR_URL_MAP_PATCH_SEC)
+            while time.time() < deadline:
                 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
                 if not stats.rpcs_by_method:
                     raise ValueError(
                         'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
                     )
-                logger.info('attempt %d', i)
                 if compare_expected_instances(stats, expected_instances):
                     logger.info("success")
-                    break
-                elif i == retry_count - 1:
-                    raise Exception(
-                        'timeout waiting for RPCs to the expected instances: %s'
-                        % expected_instances)
+                    return
+            raise Exception(
+                'timeout waiting for RPCs to the expected instances: %s' %
+                expected_instances)
     finally:
         patch_url_map_backend_service(gcp, original_backend_service)
         patch_backend_service(gcp, alternate_backend_service, [])
@@ -1657,23 +1667,22 @@ def test_header_matching(gcp, original_backend_service, instance_group,
                 original_backend_instances + alternate_backend_instances,
                 _WAIT_FOR_STATS_SEC)
 
-            retry_count = 80
-            # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
-            # seconds timeout.
-            for i in range(retry_count):
+            deadline = time.time() + _WAIT_FOR_URL_MAP_PATCH_SEC
+            logger.info(
+                'Attempting for %d seconds until received the expected distribution',
+                _WAIT_FOR_URL_MAP_PATCH_SEC)
+            while time.time() < deadline:
                 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
                 if not stats.rpcs_by_method:
                     raise ValueError(
                         'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
                     )
-                logger.info('attempt %d', i)
                 if compare_expected_instances(stats, expected_instances):
                     logger.info("success")
-                    break
-                elif i == retry_count - 1:
-                    raise Exception(
-                        'timeout waiting for RPCs to the expected instances: %s'
-                        % expected_instances)
+                    return
+            raise Exception(
+                'timeout waiting for RPCs to the expected instances: %s' %
+                expected_instances)
     finally:
         patch_url_map_backend_service(gcp, original_backend_service)
         patch_backend_service(gcp, alternate_backend_service, [])
@@ -1684,7 +1693,7 @@ def test_circuit_breaking(gcp, original_backend_service, instance_group,
     '''
     Since backend service circuit_breakers configuration cannot be unset,
     which causes trouble for restoring validate_for_proxy flag in target
-    proxy/global forwarding rule. This test uses dedicated backend sevices.
+    proxy/global forwarding rule. This test uses dedicated backend services.
     The url_map and backend services undergoes the following state changes:
 
     Before test:
@@ -1911,53 +1920,62 @@ def test_timeout(gcp, original_backend_service, instance_group):
     ]
 
     try:
-        first_case = True
-        for (testcase_name, client_config, expected_results) in test_cases:
-            logger.info('starting case %s', testcase_name)
-            configure_client(**client_config)
-            # wait a second to help ensure the client stops sending RPCs with
-            # the old config.  We will make multiple attempts if it is failing,
-            # but this improves confidence that the test is valid if the
-            # previous client_config would lead to the same results.
-            time.sleep(1)
-            # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
-            # second timeout.
-            attempt_count = 20
-            if first_case:
-                attempt_count = 120
-                first_case = False
-            before_stats = get_client_accumulated_stats()
-            if not before_stats.stats_per_method:
-                raise ValueError(
-                    'stats.stats_per_method is None, the interop client stats service does not support this test case'
-                )
-            for i in range(attempt_count):
-                logger.info('%s: attempt %d', testcase_name, i)
-
-                test_runtime_secs = 10
-                time.sleep(test_runtime_secs)
-                after_stats = get_client_accumulated_stats()
-
-                success = True
-                for rpc, status in expected_results.items():
-                    qty = (after_stats.stats_per_method[rpc].result[status] -
-                           before_stats.stats_per_method[rpc].result[status])
-                    want = test_runtime_secs * args.qps
-                    # Allow 10% deviation from expectation to reduce flakiness
-                    if qty < (want * .9) or qty > (want * 1.1):
-                        logger.info('%s: failed due to %s[%s]: got %d want ~%d',
-                                    testcase_name, rpc, status, qty, want)
-                        success = False
-                if success:
-                    logger.info('success')
-                    break
-                logger.info('%s attempt %d failed', testcase_name, i)
-                before_stats = after_stats
-            else:
-                raise Exception(
-                    '%s: timeout waiting for expected results: %s; got %s' %
-                    (testcase_name, expected_results,
-                     after_stats.stats_per_method))
+        deadline = time.time() + _WAIT_FOR_URL_MAP_PATCH_SEC
+        logger.info(
+            'Attempting for %d seconds until received the expected distribution',
+            _WAIT_FOR_URL_MAP_PATCH_SEC)
+        attempt_counter = 0
+        while True:
+            attempt_counter += 1
+            try:
+                for (testcase_name, client_config,
+                     expected_results) in test_cases:
+                    logger.info('starting case %s: attempt %d', testcase_name,
+                                attempt_counter)
+                    configure_client(**client_config)
+                    # wait a second to help ensure the client stops sending RPCs with
+                    # the old config.  We will make multiple attempts if it is failing,
+                    # but this improves confidence that the test is valid if the
+                    # previous client_config would lead to the same results.
+                    time.sleep(1)
+                    before_stats = get_client_accumulated_stats()
+                    if not before_stats.stats_per_method:
+                        raise ValueError(
+                            'stats.stats_per_method is None, the interop client stats service does not support this test case'
+                        )
+                    logger.info('%s: attempt %d', testcase_name,
+                                attempt_counter)
+
+                    test_runtime_secs = 10
+                    time.sleep(test_runtime_secs)
+                    after_stats = get_client_accumulated_stats()
+
+                    success = True
+                    for rpc, status in expected_results.items():
+                        qty = (
+                            after_stats.stats_per_method[rpc].result[status] -
+                            before_stats.stats_per_method[rpc].result[status])
+                        want = test_runtime_secs * args.qps
+                        # Allow 10% deviation from expectation to reduce flakiness
+                        if qty < (want * .9) or qty > (want * 1.1):
+                            logger.info(
+                                '%s: failed due to %s[%s]: got %d want ~%d',
+                                testcase_name, rpc, status, qty, want)
+                            success = False
+                    if success:
+                        logger.info('success')
+                        return
+                    logger.info('%s attempt %d failed', testcase_name,
+                                attempt_counter)
+                    raise RpcDistributionError(
+                        '%s: timeout waiting for expected results: %s; got %s' %
+                        (testcase_name, expected_results,
+                         after_stats.stats_per_method))
+            except RpcDistributionError as e:
+                if time.time() < deadline:
+                    pass
+                else:
+                    raise
     finally:
         patch_url_map_backend_service(gcp, original_backend_service)
 
@@ -2080,70 +2098,78 @@ def test_fault_injection(gcp, original_backend_service, instance_group):
     ]
 
     try:
-        first_case = True
-        for (testcase_name, client_config, expected_results) in test_cases:
-            logger.info('starting case %s', testcase_name)
-
-            client_config['metadata'] = [
-                (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
-                 testcase_header, testcase_name)
-            ]
-            client_config['rpc_types'] = [
-                messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
-            ]
-            configure_client(**client_config)
-            # wait a second to help ensure the client stops sending RPCs with
-            # the old config.  We will make multiple attempts if it is failing,
-            # but this improves confidence that the test is valid if the
-            # previous client_config would lead to the same results.
-            time.sleep(1)
-            # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
-            # second timeout.
-            attempt_count = 20
-            if first_case:
-                attempt_count = 120
-                first_case = False
-            before_stats = get_client_accumulated_stats()
-            if not before_stats.stats_per_method:
-                raise ValueError(
-                    'stats.stats_per_method is None, the interop client stats service does not support this test case'
-                )
-            for i in range(attempt_count):
-                logger.info('%s: attempt %d', testcase_name, i)
-
-                test_runtime_secs = 10
-                time.sleep(test_runtime_secs)
-                after_stats = get_client_accumulated_stats()
-
-                success = True
-                for status, pct in expected_results.items():
-                    rpc = 'UNARY_CALL'
-                    qty = (after_stats.stats_per_method[rpc].result[status] -
-                           before_stats.stats_per_method[rpc].result[status])
-                    want = pct * args.qps * test_runtime_secs
-                    # Allow 10% deviation from expectation to reduce flakiness
-                    VARIANCE_ALLOWED = 0.1
-                    if abs(qty - want) > want * VARIANCE_ALLOWED:
-                        logger.info('%s: failed due to %s[%s]: got %d want ~%d',
-                                    testcase_name, rpc, status, qty, want)
-                        success = False
-                if success:
-                    logger.info('success')
-                    break
-                logger.info('%s attempt %d failed', testcase_name, i)
-                before_stats = after_stats
-            else:
-                raise Exception(
-                    '%s: timeout waiting for expected results: %s; got %s' %
-                    (testcase_name, expected_results,
-                     after_stats.stats_per_method))
+        deadline = time.time() + _WAIT_FOR_URL_MAP_PATCH_SEC
+        logger.info(
+            'Attempting for %d seconds until received the expected distribution',
+            _WAIT_FOR_URL_MAP_PATCH_SEC)
+        attempt_counter = 0
+        while True:
+            attempt_counter += 1
+            try:
+                for (testcase_name, client_config,
+                     expected_results) in test_cases:
+                    logger.info('starting case %s: attempt %d', testcase_name,
+                                attempt_counter)
+
+                    client_config['metadata'] = [
+                        (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
+                         testcase_header, testcase_name)
+                    ]
+                    client_config['rpc_types'] = [
+                        messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
+                    ]
+                    configure_client(**client_config)
+                    # wait a second to help ensure the client stops sending RPCs with
+                    # the old config.  We will make multiple attempts if it is failing,
+                    # but this improves confidence that the test is valid if the
+                    # previous client_config would lead to the same results.
+                    time.sleep(1)
+                    # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
+                    # second timeout.
+                    before_stats = get_client_accumulated_stats()
+                    if not before_stats.stats_per_method:
+                        raise ValueError(
+                            'stats.stats_per_method is None, the interop client stats service does not support this test case'
+                        )
+                    logger.info('%s: attempt %d', testcase_name, i)
+
+                    test_runtime_secs = 10
+                    time.sleep(test_runtime_secs)
+                    after_stats = get_client_accumulated_stats()
+
+                    success = True
+                    for status, pct in expected_results.items():
+                        rpc = 'UNARY_CALL'
+                        qty = (
+                            after_stats.stats_per_method[rpc].result[status] -
+                            before_stats.stats_per_method[rpc].result[status])
+                        want = pct * args.qps * test_runtime_secs
+                        # Allow 10% deviation from expectation to reduce flakiness
+                        VARIANCE_ALLOWED = 0.1
+                        if abs(qty - want) > want * VARIANCE_ALLOWED:
+                            logger.info(
+                                '%s: failed due to %s[%s]: got %d want ~%d',
+                                testcase_name, rpc, status, qty, want)
+                            success = False
+                    if success:
+                        logger.info('success')
+                        break
+                    logger.info('%s attempt %d failed', testcase_name, i)
+                    raise RpcDistributionError(
+                        '%s: timeout waiting for expected results: %s; got %s' %
+                        (testcase_name, expected_results,
+                         after_stats.stats_per_method))
+            except RpcDistributionError as e:
+                if time.time() < deadline:
+                    pass
+                else:
+                    raise
     finally:
         patch_url_map_backend_service(gcp, original_backend_service)
         set_validate_for_proxyless(gcp, True)
 
 
 def test_csds(gcp, original_backend_service, instance_group, server_uri):
-    test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds()
     sleep_interval_between_attempts_s = datetime.timedelta(
         seconds=2).total_seconds()
     logger.info('Running test_csds')
@@ -2151,10 +2177,9 @@ def test_csds(gcp, original_backend_service, instance_group, server_uri):
     logger.info('waiting for original backends to become healthy')
     wait_for_healthy_backends(gcp, original_backend_service, instance_group)
 
-    # Test case timeout: 5 minutes
-    deadline = time.time() + test_csds_timeout_s
+    deadline = time.time() + _WAIT_FOR_URL_MAP_PATCH_SEC
     cnt = 0
-    while time.time() <= deadline:
+    while time.time() < deadline:
         client_config = get_client_xds_config_dump()
         logger.info('test_csds attempt %d: received xDS config %s', cnt,
                     json.dumps(client_config, indent=2))
@@ -2226,7 +2251,7 @@ def test_csds(gcp, original_backend_service, instance_group, server_uri):
         cnt += 1
 
     raise RuntimeError('failed to receive a valid xDS config in %s seconds' %
-                       test_csds_timeout_s)
+                       _WAIT_FOR_URL_MAP_PATCH_SEC)
 
 
 def set_validate_for_proxyless(gcp, validate_for_proxyless):