[PSM Interop] Add a step to wait for active XDS channel when start test client. (#34631)

* Logs when XDS channel check passed:
```
I1010 22:53:35.013700 140608769881920 client_app.py:278] [psm-grpc-client-9b5756c77-4gv6d] Waiting to report an active channel to trafficdirector.googleapis.com:443
I1010 22:53:38.879174 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: <Channel channel_id=10 target=trafficdirector.googleapis.com:443 call_started=2 calls_failed=2 state=READY>
I1010 22:53:49.002596 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: <Channel channel_id=10 target=trafficdirector.googleapis.com:443 call_started=5 calls_failed=5 state=READY>
I1010 22:53:59.130141 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: <Channel channel_id=10 target=trafficdirector.googleapis.com:443 call_started=6 calls_failed=6 state=READY>
I1010 22:54:09.253418 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: <Channel channel_id=10 target=trafficdirector.googleapis.com:443 call_started=7 calls_failed=7 state=READY>
I1010 22:54:19.386313 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: <Channel channel_id=10 target=trafficdirector.googleapis.com:443 call_started=8 calls_failed=8 state=READY>
I1010 22:54:35.517963 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: <Channel channel_id=10 target=trafficdirector.googleapis.com:443 call_started=8 calls_failed=8 state=READY>
I1010 22:55:00.638522 140608769881920 client_app.py:306] [psm-grpc-client-9b5756c77-4gv6d] xds channel: <Channel channel_id=10 target=trafficdirector.googleapis.com:443 call_started=10 calls_failed=8 state=READY>
I1010 22:55:00.638787 140608769881920 client_app.py:314] [psm-grpc-client-9b5756c77-4gv6d] Found an active XDS channel
I1010 22:55:00.638983 140608769881920 client_app.py:288] [psm-grpc-client-9b5756c77-4gv6d] Channel to trafficdirector.googleapis.com:443 transitioned to active
I1010 22:55:00.639290 140608769881920 client_app.py:240] [psm-grpc-client-9b5756c77-4gv6d] Waiting to report a READY channel to xds:///psm-grpc-server:8080
I1010 22:55:00.749331 140608769881920 client_app.py:347] [psm-grpc-client-9b5756c77-4gv6d] Server channel: <Channel channel_id=5 target=xds:///psm-grpc-server:8080 call_started=2215 calls_failed=1753 state=READY>
```
* Error Message when XDS channel check failed:
```
E1010 23:09:15.111581 140016347227968 base_testcase.py:60] ERROR Traceback in __main__.BaselineTest.test_traffic_director_grpc_setup:
Traceback (most recent call last):
  File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py", line 53, in test_traffic_director_grpc_setup
    test_client: _XdsTestClient = self.startTestClient(test_server)
  File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py", line 787, in startTestClient
    return self._start_test_client(test_server.xds_uri, **kwargs)
  File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py", line 798, in _start_test_client
    test_client.wait_for_active_xds_channel(
  File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py", line 171, in wait_for_active_xds_channel
    return self.wait_for_xds_channel_active(
  File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py", line 283, in wait_for_xds_channel_active
    channel = retryer(
  File "/usr/local/google/home/xuanwn/.pyenv/versions/310xds/lib/python3.10/site-packages/tenacity/__init__.py", line 423, in __call__
    do = self.iter(retry_state=retry_state)
  File "/usr/local/google/home/xuanwn/.pyenv/versions/310xds/lib/python3.10/site-packages/tenacity/__init__.py", line 369, in iter
    return self.retry_error_callback(retry_state=retry_state)
  File "/usr/local/google/home/xuanwn/workspace/xds/grpc/tools/run_tests/xds_k8s_test_driver/framework/helpers/retryers.py", line 141, in error_handler
    raise RetryError(
framework.helpers.retryers.RetryError: Retry error calling framework.test_app.client_app.XdsTestClient.find_active_xds_channel: timeout 0:05:00 (h:mm:ss) exceeded. Last exception: ChannelNotActive: [psm-grpc-client-755fc5b468-qkh22] Client has no active channel with xds server trafficdirector.googleapis.com:443
```

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->

---------

Co-authored-by: Sergii Tkachenko <hi@sergii.org>
pull/34774/head
Xuan Wang 1 year ago committed by GitHub
parent 299b4fe3fd
commit b16fa809e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      tools/run_tests/xds_k8s_test_driver/bin/run_channelz.py
  2. 2
      tools/run_tests/xds_k8s_test_driver/bin/run_ping_pong.py
  3. 2
      tools/run_tests/xds_k8s_test_driver/framework/bootstrap_generator_testcase.py
  4. 4
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py
  5. 143
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
  6. 13
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  7. 4
      tools/run_tests/xds_k8s_test_driver/tests/security_test.py

@ -153,7 +153,7 @@ def debug_security_setup_negative(test_client):
def debug_security_setup_positive(test_client, test_server):
"""Debug positive cases: mTLS, TLS, Plaintext."""
test_client.wait_for_active_server_channel()
test_client.wait_for_server_channel_ready()
client_sock: _Socket = test_client.get_active_server_channel_socket()
server_sock: _Socket = test_server.get_server_socket_matching_client(
client_sock
@ -181,7 +181,7 @@ def debug_security_setup_positive(test_client, test_server):
def debug_basic_setup(test_client, test_server):
"""Show channel and server socket pair"""
test_client.wait_for_active_server_channel()
test_client.wait_for_server_channel_ready()
client_sock: _Socket = test_client.get_active_server_channel_socket()
server_sock: _Socket = test_server.get_server_socket_matching_client(
client_sock

@ -79,7 +79,7 @@ def get_client_rpc_stats(
def run_ping_pong(test_client: _XdsTestClient, num_rpcs: int):
test_client.wait_for_active_server_channel()
test_client.wait_for_server_channel_ready()
lb_stats = get_client_rpc_stats(test_client, num_rpcs)
for backend, rpcs_count in lb_stats.rpcs_by_peer.items():
if int(rpcs_count) < 1:

@ -178,5 +178,5 @@ class BootstrapGeneratorBaseTest(xds_k8s_testcase.XdsKubernetesBaseTestCase):
test_client = self.client_runner.run(
server_target=test_server.xds_uri, **kwargs
)
test_client.wait_for_active_server_channel()
test_client.wait_for_server_channel_ready()
return test_client

@ -107,6 +107,10 @@ class ChannelzServiceClient(framework.rpc.grpc.GrpcClientHelper):
result = f"<Channel channel_id={channel.ref.channel_id}"
if channel.data.target:
result += f" target={channel.data.target}"
result += (
f" call_started={channel.data.calls_started}"
+ f" calls_failed={channel.data.calls_failed}"
)
result += f" state={ChannelState.Name(channel.data.state.state)}>"
return result

@ -44,6 +44,8 @@ _CsdsClient = grpc_csds.CsdsClient
# Use in get_load_balancer_stats request to request all metadata.
REQ_LB_STATS_METADATA_ALL = ("*",)
DEFAULT_TD_XDS_URI = "trafficdirector.googleapis.com:443"
class XdsTestClient(framework.rpc.grpc.GrpcApp):
"""
@ -129,7 +131,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
timeout_sec=timeout_sec
)
def wait_for_active_server_channel(
def wait_for_server_channel_ready(
self,
*,
timeout: Optional[_timedelta] = None,
@ -155,6 +157,33 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
)
raise
def wait_for_active_xds_channel(
self,
*,
xds_server_uri: Optional[str] = None,
timeout: Optional[_timedelta] = None,
rpc_deadline: Optional[_timedelta] = None,
) -> _ChannelzChannel:
"""Wait until the xds channel is active or timeout.
Raises:
GrpcApp.NotFound: If the channel to xds never transitioned to active.
"""
try:
return self.wait_for_xds_channel_active(
xds_server_uri=xds_server_uri,
timeout=timeout,
rpc_deadline=rpc_deadline,
)
except retryers.RetryError as retry_err:
if isinstance(retry_err.exception(), self.ChannelNotFound):
retry_err.add_note(
framework.errors.FrameworkError.note_blanket_error(
"The client couldn't connect to the xDS control plane."
)
)
raise
def get_active_server_channel_socket(self) -> _ChannelzSocket:
channel = self.find_server_channel_with_state(
_ChannelzChannelState.READY
@ -230,6 +259,82 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
)
return channel
def wait_for_xds_channel_active(
self,
*,
xds_server_uri: Optional[str] = None,
timeout: Optional[_timedelta] = None,
rpc_deadline: Optional[_timedelta] = None,
) -> _ChannelzChannel:
if not xds_server_uri:
xds_server_uri = DEFAULT_TD_XDS_URI
# When polling for a state, prefer smaller wait times to avoid
# exhausting all allowed time on a single long RPC.
if rpc_deadline is None:
rpc_deadline = _timedelta(seconds=30)
retryer = retryers.exponential_retryer_with_timeout(
wait_min=_timedelta(seconds=10),
wait_max=_timedelta(seconds=25),
timeout=_timedelta(minutes=5) if timeout is None else timeout,
)
logger.info(
"[%s] ADS: Waiting for successful calls to xDS control plane to %s",
self.hostname,
xds_server_uri,
)
channel = retryer(
self.find_active_xds_channel,
xds_server_uri=xds_server_uri,
rpc_deadline=rpc_deadline,
)
logger.info(
"[%s] ADS: Detected successful calls to xDS control plane %s",
self.hostname,
xds_server_uri,
)
return channel
def find_active_xds_channel(
self,
xds_server_uri: str,
*,
rpc_deadline: Optional[_timedelta] = None,
) -> _ChannelzChannel:
rpc_params = {}
if rpc_deadline is not None:
rpc_params["deadline_sec"] = rpc_deadline.total_seconds()
for channel in self.get_server_channels(xds_server_uri, **rpc_params):
logger.info(
"[%s] xDS control plane channel: %s",
self.hostname,
_ChannelzServiceClient.channel_repr(channel),
)
try:
channel = self.check_channel_successful_calls(
channel, **rpc_params
)
logger.info(
"[%s] Detected successful calls to xDS control plane: %s",
self.hostname,
xds_server_uri,
)
except self.NotFound as e:
# Otherwise, keep searching.
continue
return channel
raise self.ChannelNotActive(
f"[{self.hostname}] Client has no"
f" active channel with xds server {xds_server_uri}",
src=self.hostname,
dst=xds_server_uri,
)
def find_server_channel_with_state(
self,
expected_state: _ChannelzChannelState,
@ -312,6 +417,24 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
subchannels.append(subchannel)
return subchannels
def check_channel_successful_calls(
self, channel: _ChannelzChannel, **kwargs
) -> _ChannelzChannel:
"""Checks if the channel has any successful calls.
We consider the channel is active if channel is in READY state and calls_started is
greater than calls_failed.
"""
if (
channel.data.state.state is _ChannelzChannelState.READY
and channel.data.calls_started > channel.data.calls_failed
):
return channel
raise self.NotFound(
f"[{self.hostname}] Not found successful calls over the channel."
)
class ChannelNotFound(framework.rpc.grpc.GrpcApp.NotFound):
"""Channel with expected status not found"""
@ -332,3 +455,21 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
self.dst = dst
self.expected_state = expected_state
super().__init__(message, src, dst, expected_state, **kwargs)
class ChannelNotActive(framework.rpc.grpc.GrpcApp.NotFound):
"""No active channel was found"""
src: str
dst: str
def __init__(
self,
message: str,
*,
src: str,
dst: str,
**kwargs,
):
self.src = src
self.dst = dst
super().__init__(message, src, dst, **kwargs)

@ -826,13 +826,18 @@ class RegularXdsKubernetesTestCase(IsolatedXdsKubernetesTestCase):
self,
server_target: str,
*,
wait_for_active_ads_timeout: Optional[_timedelta] = None,
wait_for_active_channel_timeout: Optional[_timedelta] = None,
**kwargs,
) -> XdsTestClient:
test_client = self.client_runner.run(
server_target=server_target, **kwargs
)
test_client.wait_for_active_server_channel(
test_client.wait_for_active_xds_channel(
xds_server_uri=self.xds_server_uri,
timeout=wait_for_active_ads_timeout,
)
test_client.wait_for_server_channel_ready(
timeout=wait_for_active_channel_timeout,
)
return test_client
@ -956,14 +961,14 @@ class SecurityXdsKubernetesTestCase(IsolatedXdsKubernetesTestCase):
self,
test_server: XdsTestServer,
*,
wait_for_active_server_channel=True,
wait_for_server_channel_ready=True,
**kwargs,
) -> XdsTestClient:
test_client = self.client_runner.run(
server_target=test_server.xds_uri, secure_mode=True, **kwargs
)
if wait_for_active_server_channel:
test_client.wait_for_active_server_channel()
if wait_for_server_channel_ready:
test_client.wait_for_server_channel_ready()
return test_client
def assertTestAppSecurity(

@ -153,7 +153,7 @@ class SecurityTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase):
# Start the client, but don't wait for it to report a healthy channel.
test_client: _XdsTestClient = self.startSecureTestClient(
test_server, wait_for_active_server_channel=False
test_server, wait_for_server_channel_ready=False
)
self.assertClientCannotReachServerRepeatedly(test_client)
@ -205,7 +205,7 @@ class SecurityTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase):
# Start the client, but don't wait for it to report a healthy channel.
test_client: _XdsTestClient = self.startSecureTestClient(
test_server, wait_for_active_server_channel=False
test_server, wait_for_server_channel_ready=False
)
self.assertClientCannotReachServerRepeatedly(test_client)

Loading…
Cancel
Save