From b16fa809e93c6066b5978fedfba9d10041ad91a5 Mon Sep 17 00:00:00 2001 From: Xuan Wang Date: Mon, 23 Oct 2023 09:32:18 -0700 Subject: [PATCH] [PSM Interop] Add a step to wait for active XDS channel when start test client. (#34631) * Logs when XDS channel check passed: ``` I1010 22:53:35.013700 140608769881920 client_app.py:278] [psm-grpc-client-9b5756c77-4gv6d] Waiting to report an active channel to trafficdirector.googleapis.com:443 I1010 22:53:38.879174 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: I1010 22:53:49.002596 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: I1010 22:53:59.130141 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: I1010 22:54:09.253418 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: I1010 22:54:19.386313 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: I1010 22:54:35.517963 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: I1010 22:55:00.638522 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: I1010 22:55:00.638787 140608769881920 client_app.py:314] [psm-grpc-client-9b5756c77-4gv6d] Found an active XDS channel I1010 22:55:00.638983 140608769881920 client_app.py:288] [psm-grpc-client-9b5756c77-4gv6d] Channel to trafficdirector.googleapis.com:443 transitioned to active I1010 22:55:00.639290 140608769881920 client_app.py:240] [psm-grpc-client-9b5756c77-4gv6d] Waiting to report a READY channel to xds:///psm-grpc-server:8080 I1010 22:55:00.749331 140608769881920 client_app.py:347] [psm-grpc-client-9b5756c77-4gv6d] Server channel: ``` * Error Message when XDS channel check failed: ``` E1010 23:09:15.111581 140016347227968 base_testcase.py:60] ERROR Traceback in __main__.BaselineTest.test_traffic_director_grpc_setup: Traceback (most recent call last): File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py", line 53, in test_traffic_director_grpc_setup test_client: _XdsTestClient = self.startTestClient(test_server) File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py", line 787, in startTestClient return self._start_test_client(test_server.xds_uri, **kwargs) File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py", line 798, in _start_test_client test_client.wait_for_active_xds_channel( File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py", line 171, in wait_for_active_xds_channel return self.wait_for_xds_channel_active( File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py", line 283, in wait_for_xds_channel_active channel = retryer( File "/usr/local/google/home/xuanwn/.pyenv/versions/310xds/lib/python3.10/site-packages/tenacity/__init__.py", line 423, in __call__ do = self.iter(retry_state=retry_state) File "/usr/local/google/home/xuanwn/.pyenv/versions/310xds/lib/python3.10/site-packages/tenacity/__init__.py", line 369, in iter return self.retry_error_callback(retry_state=retry_state) File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/framework/helpers/retryers.py", line 141, in error_handler raise RetryError( framework.helpers.retryers.RetryError: Retry error calling framework.test_app.client_app.XdsTestClient.find_active_xds_channel: timeout 0:05:00 (h:mm:ss) exceeded. Last exception: ChannelNotActive: [psm-grpc-client-755fc5b468-qkh22] Client has no active channel with xds server trafficdirector.googleapis.com:443 ``` --------- Co-authored-by: Sergii Tkachenko --- .../xds_k8s_test_driver/bin/run_channelz.py | 4 +- .../xds_k8s_test_driver/bin/run_ping_pong.py | 2 +- .../framework/bootstrap_generator_testcase.py | 2 +- .../framework/rpc/grpc_channelz.py | 4 + .../framework/test_app/client_app.py | 143 +++++++++++++++++- .../framework/xds_k8s_testcase.py | 13 +- .../tests/security_test.py | 4 +- 7 files changed, 161 insertions(+), 11 deletions(-) diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py b/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py index 27cf788f67b..62ab103a0d6 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py @@ -153,7 +153,7 @@ def debug_security_setup_negative(test_client): def debug_security_setup_positive(test_client, test_server): """Debug positive cases: mTLS, TLS, Plaintext.""" - test_client.wait_for_active_server_channel() + test_client.wait_for_server_channel_ready() client_sock: _Socket = test_client.get_active_server_channel_socket() server_sock: _Socket = test_server.get_server_socket_matching_client( client_sock @@ -181,7 +181,7 @@ def debug_security_setup_positive(test_client, test_server): def debug_basic_setup(test_client, test_server): """Show channel and server socket pair""" - test_client.wait_for_active_server_channel() + test_client.wait_for_server_channel_ready() client_sock: _Socket = test_client.get_active_server_channel_socket() server_sock: _Socket = test_server.get_server_socket_matching_client( client_sock diff --git a/tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py b/tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py index af5cbb5da98..c321e389907 100755 --- a/tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py +++ b/tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py @@ -79,7 +79,7 @@ def get_client_rpc_stats( def run_ping_pong(test_client: _XdsTestClient, num_rpcs: int): - test_client.wait_for_active_server_channel() + test_client.wait_for_server_channel_ready() lb_stats = get_client_rpc_stats(test_client, num_rpcs) for backend, rpcs_count in lb_stats.rpcs_by_peer.items(): if int(rpcs_count) < 1: diff --git a/tools/run_tests/xds_k8s_test_driver/framework/bootstrap_generator_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/bootstrap_generator_testcase.py index 96ff1864fb3..8c26d47fdd8 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/bootstrap_generator_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/bootstrap_generator_testcase.py @@ -178,5 +178,5 @@ class BootstrapGeneratorBaseTest(xds_k8s_testcase.XdsKubernetesBaseTestCase): test_client = self.client_runner.run( server_target=test_server.xds_uri, **kwargs ) - test_client.wait_for_active_server_channel() + test_client.wait_for_server_channel_ready() return test_client diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py index 29de0ca1950..59e14786dfd 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py @@ -107,6 +107,10 @@ class ChannelzServiceClient(framework.rpc.grpc.GrpcClientHelper): result = f"" return result diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py index fce48ef7b4e..81b2345830b 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py @@ -44,6 +44,8 @@ _CsdsClient = grpc_csds.CsdsClient # Use in get_load_balancer_stats request to request all metadata. REQ_LB_STATS_METADATA_ALL = ("*",) +DEFAULT_TD_XDS_URI = "trafficdirector.googleapis.com:443" + class XdsTestClient(framework.rpc.grpc.GrpcApp): """ @@ -129,7 +131,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): timeout_sec=timeout_sec ) - def wait_for_active_server_channel( + def wait_for_server_channel_ready( self, *, timeout: Optional[_timedelta] = None, @@ -155,6 +157,33 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): ) raise + def wait_for_active_xds_channel( + self, + *, + xds_server_uri: Optional[str] = None, + timeout: Optional[_timedelta] = None, + rpc_deadline: Optional[_timedelta] = None, + ) -> _ChannelzChannel: + """Wait until the xds channel is active or timeout. + + Raises: + GrpcApp.NotFound: If the channel to xds never transitioned to active. + """ + try: + return self.wait_for_xds_channel_active( + xds_server_uri=xds_server_uri, + timeout=timeout, + rpc_deadline=rpc_deadline, + ) + except retryers.RetryError as retry_err: + if isinstance(retry_err.exception(), self.ChannelNotFound): + retry_err.add_note( + framework.errors.FrameworkError.note_blanket_error( + "The client couldn't connect to the xDS control plane." + ) + ) + raise + def get_active_server_channel_socket(self) -> _ChannelzSocket: channel = self.find_server_channel_with_state( _ChannelzChannelState.READY @@ -230,6 +259,82 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): ) return channel + def wait_for_xds_channel_active( + self, + *, + xds_server_uri: Optional[str] = None, + timeout: Optional[_timedelta] = None, + rpc_deadline: Optional[_timedelta] = None, + ) -> _ChannelzChannel: + if not xds_server_uri: + xds_server_uri = DEFAULT_TD_XDS_URI + # When polling for a state, prefer smaller wait times to avoid + # exhausting all allowed time on a single long RPC. + if rpc_deadline is None: + rpc_deadline = _timedelta(seconds=30) + + retryer = retryers.exponential_retryer_with_timeout( + wait_min=_timedelta(seconds=10), + wait_max=_timedelta(seconds=25), + timeout=_timedelta(minutes=5) if timeout is None else timeout, + ) + + logger.info( + "[%s] ADS: Waiting for successful calls to xDS control plane to %s", + self.hostname, + xds_server_uri, + ) + channel = retryer( + self.find_active_xds_channel, + xds_server_uri=xds_server_uri, + rpc_deadline=rpc_deadline, + ) + logger.info( + "[%s] ADS: Detected successful calls to xDS control plane %s", + self.hostname, + xds_server_uri, + ) + return channel + + def find_active_xds_channel( + self, + xds_server_uri: str, + *, + rpc_deadline: Optional[_timedelta] = None, + ) -> _ChannelzChannel: + rpc_params = {} + if rpc_deadline is not None: + rpc_params["deadline_sec"] = rpc_deadline.total_seconds() + + for channel in self.get_server_channels(xds_server_uri, **rpc_params): + logger.info( + "[%s] xDS control plane channel: %s", + self.hostname, + _ChannelzServiceClient.channel_repr(channel), + ) + + try: + channel = self.check_channel_successful_calls( + channel, **rpc_params + ) + logger.info( + "[%s] Detected successful calls to xDS control plane: %s", + self.hostname, + xds_server_uri, + ) + except self.NotFound as e: + # Otherwise, keep searching. + continue + + return channel + + raise self.ChannelNotActive( + f"[{self.hostname}] Client has no" + f" active channel with xds server {xds_server_uri}", + src=self.hostname, + dst=xds_server_uri, + ) + def find_server_channel_with_state( self, expected_state: _ChannelzChannelState, @@ -312,6 +417,24 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): subchannels.append(subchannel) return subchannels + def check_channel_successful_calls( + self, channel: _ChannelzChannel, **kwargs + ) -> _ChannelzChannel: + """Checks if the channel has any successful calls. + + We consider the channel is active if channel is in READY state and calls_started is + greater than calls_failed. + """ + if ( + channel.data.state.state is _ChannelzChannelState.READY + and channel.data.calls_started > channel.data.calls_failed + ): + return channel + + raise self.NotFound( + f"[{self.hostname}] Not found successful calls over the channel." + ) + class ChannelNotFound(framework.rpc.grpc.GrpcApp.NotFound): """Channel with expected status not found""" @@ -332,3 +455,21 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): self.dst = dst self.expected_state = expected_state super().__init__(message, src, dst, expected_state, **kwargs) + + class ChannelNotActive(framework.rpc.grpc.GrpcApp.NotFound): + """No active channel was found""" + + src: str + dst: str + + def __init__( + self, + message: str, + *, + src: str, + dst: str, + **kwargs, + ): + self.src = src + self.dst = dst + super().__init__(message, src, dst, **kwargs) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py index 29167ecc816..b3779e3842b 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py @@ -826,13 +826,18 @@ class RegularXdsKubernetesTestCase(IsolatedXdsKubernetesTestCase): self, server_target: str, *, + wait_for_active_ads_timeout: Optional[_timedelta] = None, wait_for_active_channel_timeout: Optional[_timedelta] = None, **kwargs, ) -> XdsTestClient: test_client = self.client_runner.run( server_target=server_target, **kwargs ) - test_client.wait_for_active_server_channel( + test_client.wait_for_active_xds_channel( + xds_server_uri=self.xds_server_uri, + timeout=wait_for_active_ads_timeout, + ) + test_client.wait_for_server_channel_ready( timeout=wait_for_active_channel_timeout, ) return test_client @@ -956,14 +961,14 @@ class SecurityXdsKubernetesTestCase(IsolatedXdsKubernetesTestCase): self, test_server: XdsTestServer, *, - wait_for_active_server_channel=True, + wait_for_server_channel_ready=True, **kwargs, ) -> XdsTestClient: test_client = self.client_runner.run( server_target=test_server.xds_uri, secure_mode=True, **kwargs ) - if wait_for_active_server_channel: - test_client.wait_for_active_server_channel() + if wait_for_server_channel_ready: + test_client.wait_for_server_channel_ready() return test_client def assertTestAppSecurity( diff --git a/tools/run_tests/xds_k8s_test_driver/tests/security_test.py b/tools/run_tests/xds_k8s_test_driver/tests/security_test.py index f087a9595f9..06364030720 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/security_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/security_test.py @@ -153,7 +153,7 @@ class SecurityTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase): # Start the client, but don't wait for it to report a healthy channel. test_client: _XdsTestClient = self.startSecureTestClient( - test_server, wait_for_active_server_channel=False + test_server, wait_for_server_channel_ready=False ) self.assertClientCannotReachServerRepeatedly(test_client) @@ -205,7 +205,7 @@ class SecurityTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase): # Start the client, but don't wait for it to report a healthy channel. test_client: _XdsTestClient = self.startSecureTestClient( - test_server, wait_for_active_server_channel=False + test_server, wait_for_server_channel_ready=False ) self.assertClientCannotReachServerRepeatedly(test_client)