From f163b3897b8088616e1e12432a59319905e4e07c Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Wed, 13 Dec 2023 17:04:13 -0800 Subject: [PATCH] ADS: Fix the issue with channel not refreshing on the second check --- .../xds_k8s_test_driver/framework/rpc/grpc.py | 1 + .../framework/rpc/grpc_channelz.py | 22 ++++ .../framework/test_app/client_app.py | 124 +++++++++++------- 3 files changed, 96 insertions(+), 51 deletions(-) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py index 1f6f7483ddd..b67991115be 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py @@ -25,6 +25,7 @@ logger = logging.getLogger(__name__) # Type aliases Message = google.protobuf.message.Message +RpcError = grpc.RpcError class GrpcClientHelper: diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py index 59e14786dfd..df25e2e5aaf 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py @@ -30,6 +30,7 @@ logger = logging.getLogger(__name__) # Type aliases # Channel Channel = channelz_pb2.Channel +ChannelData = channelz_pb2.ChannelData ChannelConnectivityState = channelz_pb2.ChannelConnectivityState ChannelState = ChannelConnectivityState.State # pylint: disable=no-member _GetTopChannelsRequest = channelz_pb2.GetTopChannelsRequest @@ -109,6 +110,7 @@ class ChannelzServiceClient(framework.rpc.grpc.GrpcClientHelper): result += f" target={channel.data.target}" result += ( f" call_started={channel.data.calls_started}" + + f" calls_succeeded={channel.data.calls_succeeded}" + f" calls_failed={channel.data.calls_failed}" ) result += f" state={ChannelState.Name(channel.data.state.state)}>" @@ -170,6 +172,26 @@ class ChannelzServiceClient(framework.rpc.grpc.GrpcClientHelper): start = max(start, channel.ref.channel_id) yield channel + def get_channel(self, channel_id, **kwargs) -> Channel: + """Return a single Channel, otherwise raises RpcError.""" + response: channelz_pb2.GetChannelResponse + try: + response = self.call_unary_with_deadline( + rpc="GetChannel", + req=channelz_pb2.GetChannelRequest(channel_id=channel_id), + **kwargs, + ) + return response.channel + except grpc.RpcError as err: + if isinstance(err, grpc.Call): + # Translate NOT_FOUND into GrpcApp.NotFound. + if err.code() is grpc.StatusCode.NOT_FOUND: + raise framework.rpc.grpc.GrpcApp.NotFound( + f"Channel with channel_id {channel_id} not found", + ) + + raise + def list_servers(self, **kwargs) -> Iterator[Server]: """Iterate over all pages of all servers that exist in the process.""" start: int = -1 diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py index 534431c97d0..ad4c7b9e9da 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py @@ -37,6 +37,7 @@ _XdsUpdateClientConfigureServiceClient = ( ) _ChannelzServiceClient = grpc_channelz.ChannelzServiceClient _ChannelzChannel = grpc_channelz.Channel +_ChannelzChannelData = grpc_channelz.ChannelData _ChannelzChannelState = grpc_channelz.ChannelState _ChannelzSubchannel = grpc_channelz.Subchannel _ChannelzSocket = grpc_channelz.Socket @@ -307,7 +308,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): if rpc_deadline is not None: rpc_params["deadline_sec"] = rpc_deadline.total_seconds() - for channel in self.get_server_channels(xds_server_uri, **rpc_params): + for channel in self.find_channels(xds_server_uri, **rpc_params): logger.info( "[%s] xDS control plane channel: %s", self.hostname, @@ -315,38 +316,9 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): ) try: - channel_first_attempt = self.check_channel_successful_calls( + channel_upd = self.check_channel_in_flight_calls( channel, **rpc_params ) - # Address race where a call to the xDS control plane server has - # just started and a channelz request comes in before the call - # has had a chance to fail. - # With channels to the xDS control plane, the channel can be - # READY but the calls could be failing due to failure to fetch - # OAUTH2 token. To increase the confidence that we have a valid - # channel with working OAUTH2 tokens, we check whether the - # channel is in a READY state with active calls twice with an - # interval of 2 seconds between the two attempts. If the OAUTH2 - # token is not valid, the call would fail and be caught in - # either the first attempt, or the second attempt. It is - # possible that between the two attempts, a call fails and a new - # call is started, so we also test for equality between the - # started calls of the two channelz results. - # There still exists a possibility that a call fails on fetching - # OAUTH2 token after 2 seconds (maybe because there is a - # slowdown in the system.) If such a case is observed, consider - # increasing the interval from 2 seconds to 5 seconds. - time.sleep(2) - channel_second_attempt = self.check_channel_successful_calls( - channel, **rpc_params - ) - if ( - channel_first_attempt.data.calls_started - != channel_second_attempt.data.calls_started - ): - raise self.NotFound( - f"[{self.hostname}] Not found successful calls over the channel." - ) logger.info( "[%s] Detected successful calls to xDS control plane %s," " channel: %s", @@ -354,11 +326,17 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): xds_server_uri, _ChannelzServiceClient.channel_repr(channel), ) + return channel_upd except self.NotFound: - # Otherwise, keep searching. + # Continue checking other channels to the same target on + # not found. continue - - return channel + except framework.rpc.grpc.RpcError as err: + logger.debug( + f"Unexpected error while checking" + f" channel {channel.ref.channel_id}: {err}" + ) + raise raise self.ChannelNotActive( f"[{self.hostname}] Client has no" @@ -381,7 +359,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): expected_state_name: str = _ChannelzChannelState.Name(expected_state) target: str = self.server_target - for channel in self.get_server_channels(target, **rpc_params): + for channel in self.find_channels(target, **rpc_params): channel_state: _ChannelzChannelState = channel.data.state.state logger.info( "[%s] Server channel: %s", @@ -416,10 +394,12 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): expected_state=expected_state, ) - def get_server_channels( - self, server_target: str, **kwargs + def find_channels( + self, + target: str, + **rpc_params, ) -> Iterable[_ChannelzChannel]: - return self.channelz.find_channels_for_target(server_target, **kwargs) + return self.channelz.find_channels_for_target(target, **rpc_params) def find_subchannel_with_state( self, channel: _ChannelzChannel, state: _ChannelzChannelState, **kwargs @@ -449,23 +429,65 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp): subchannels.append(subchannel) return subchannels - def check_channel_successful_calls( - self, channel: _ChannelzChannel, **kwargs - ) -> _ChannelzChannel: - """Checks if the channel has any successful calls. - - We consider the channel is active if channel is in READY state and calls_started is - greater than calls_failed. + def check_channel_in_flight_calls( + self, + channel: _ChannelzChannel, + *, + wait_between_checks: Optional[_timedelta] = None, + **rpc_params, + ) -> Optional[_ChannelzChannel]: + """Checks if the channel has calls that started, but didn't complete. + + We consider the channel is active if channel is in READY state and + calls_started is greater than calls_failed. + + This method address race where a call to the xDS control plane server + has just started and a channelz request comes in before the call has + had a chance to fail. + + With channels to the xDS control plane, the channel can be READY but the + calls could be failing to initialize, f.e. due to a failure to fetch + OAUTH2 token. To increase the confidence that we have a valid channel + with working OAUTH2 tokens, we check whether the channel is in a READY + state with active calls twice with an interval of 2 seconds between the + two attempts. If the OAUTH2 token is not valid, the call would fail and + be caught in either the first attempt, or the second attempt. It is + possible that between the two attempts, a call fails and a new call is + started, so we also test for equality between the started calls of the + two channelz results. + + There still exists a possibility that a call fails on fetching OAUTH2 + token after 2 seconds (maybe because there is a slowdown in the + system.) If such a case is observed, consider increasing the interval + from 2 seconds to 5 seconds. + + Returns updated channel on success, or None on failure. """ + if not self.calc_calls_in_flight(channel): + return None + + if not wait_between_checks: + wait_between_checks = _timedelta(seconds=2) + + # Load the channel second time after the timeout. + time.sleep(wait_between_checks.total_seconds()) + channel_upd: _ChannelzChannel = self.channelz.get_channel( + channel.ref.channel_id, **rpc_params + ) if ( - channel.data.state.state is _ChannelzChannelState.READY - and channel.data.calls_started > channel.data.calls_failed + not self.calc_calls_in_flight(channel_upd) + or channel.data.calls_started != channel_upd.data.calls_started ): - return channel + return None + return channel_upd - raise self.NotFound( - f"[{self.hostname}] Not found successful calls over the channel." - ) + @classmethod + def calc_calls_in_flight(cls, channel: _ChannelzChannel) -> int: + cdata: _ChannelzChannelData = channel.data + if cdata.state.state is not _ChannelzChannelState.READY: + return 0 + + return cdata.calls_started - cdata.calls_succeeded - cdata.calls_failed class ChannelNotFound(framework.rpc.grpc.GrpcApp.NotFound): """Channel with expected status not found"""