ADS: Fix the issue with channel not refreshing on the second check

pull/35280/head
Sergii Tkachenko 1 year ago
parent 1bdf127308
commit f163b3897b
  1. 1
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc.py
  2. 22
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_channelz.py
  3. 124
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py

@ -25,6 +25,7 @@ logger = logging.getLogger(__name__)
# Type aliases # Type aliases
Message = google.protobuf.message.Message Message = google.protobuf.message.Message
RpcError = grpc.RpcError
class GrpcClientHelper: class GrpcClientHelper:

@ -30,6 +30,7 @@ logger = logging.getLogger(__name__)
# Type aliases # Type aliases
# Channel # Channel
Channel = channelz_pb2.Channel Channel = channelz_pb2.Channel
ChannelData = channelz_pb2.ChannelData
ChannelConnectivityState = channelz_pb2.ChannelConnectivityState ChannelConnectivityState = channelz_pb2.ChannelConnectivityState
ChannelState = ChannelConnectivityState.State # pylint: disable=no-member ChannelState = ChannelConnectivityState.State # pylint: disable=no-member
_GetTopChannelsRequest = channelz_pb2.GetTopChannelsRequest _GetTopChannelsRequest = channelz_pb2.GetTopChannelsRequest
@ -109,6 +110,7 @@ class ChannelzServiceClient(framework.rpc.grpc.GrpcClientHelper):
result += f" target={channel.data.target}" result += f" target={channel.data.target}"
result += ( result += (
f" call_started={channel.data.calls_started}" f" call_started={channel.data.calls_started}"
+ f" calls_succeeded={channel.data.calls_succeeded}"
+ f" calls_failed={channel.data.calls_failed}" + f" calls_failed={channel.data.calls_failed}"
) )
result += f" state={ChannelState.Name(channel.data.state.state)}>" result += f" state={ChannelState.Name(channel.data.state.state)}>"
@ -170,6 +172,26 @@ class ChannelzServiceClient(framework.rpc.grpc.GrpcClientHelper):
start = max(start, channel.ref.channel_id) start = max(start, channel.ref.channel_id)
yield channel yield channel
def get_channel(self, channel_id, **kwargs) -> Channel:
"""Return a single Channel, otherwise raises RpcError."""
response: channelz_pb2.GetChannelResponse
try:
response = self.call_unary_with_deadline(
rpc="GetChannel",
req=channelz_pb2.GetChannelRequest(channel_id=channel_id),
**kwargs,
)
return response.channel
except grpc.RpcError as err:
if isinstance(err, grpc.Call):
# Translate NOT_FOUND into GrpcApp.NotFound.
if err.code() is grpc.StatusCode.NOT_FOUND:
raise framework.rpc.grpc.GrpcApp.NotFound(
f"Channel with channel_id {channel_id} not found",
)
raise
def list_servers(self, **kwargs) -> Iterator[Server]: def list_servers(self, **kwargs) -> Iterator[Server]:
"""Iterate over all pages of all servers that exist in the process.""" """Iterate over all pages of all servers that exist in the process."""
start: int = -1 start: int = -1

@ -37,6 +37,7 @@ _XdsUpdateClientConfigureServiceClient = (
) )
_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient _ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
_ChannelzChannel = grpc_channelz.Channel _ChannelzChannel = grpc_channelz.Channel
_ChannelzChannelData = grpc_channelz.ChannelData
_ChannelzChannelState = grpc_channelz.ChannelState _ChannelzChannelState = grpc_channelz.ChannelState
_ChannelzSubchannel = grpc_channelz.Subchannel _ChannelzSubchannel = grpc_channelz.Subchannel
_ChannelzSocket = grpc_channelz.Socket _ChannelzSocket = grpc_channelz.Socket
@ -307,7 +308,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
if rpc_deadline is not None: if rpc_deadline is not None:
rpc_params["deadline_sec"] = rpc_deadline.total_seconds() rpc_params["deadline_sec"] = rpc_deadline.total_seconds()
for channel in self.get_server_channels(xds_server_uri, **rpc_params): for channel in self.find_channels(xds_server_uri, **rpc_params):
logger.info( logger.info(
"[%s] xDS control plane channel: %s", "[%s] xDS control plane channel: %s",
self.hostname, self.hostname,
@ -315,38 +316,9 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
) )
try: try:
channel_first_attempt = self.check_channel_successful_calls( channel_upd = self.check_channel_in_flight_calls(
channel, **rpc_params channel, **rpc_params
) )
# Address race where a call to the xDS control plane server has
# just started and a channelz request comes in before the call
# has had a chance to fail.
# With channels to the xDS control plane, the channel can be
# READY but the calls could be failing due to failure to fetch
# OAUTH2 token. To increase the confidence that we have a valid
# channel with working OAUTH2 tokens, we check whether the
# channel is in a READY state with active calls twice with an
# interval of 2 seconds between the two attempts. If the OAUTH2
# token is not valid, the call would fail and be caught in
# either the first attempt, or the second attempt. It is
# possible that between the two attempts, a call fails and a new
# call is started, so we also test for equality between the
# started calls of the two channelz results.
# There still exists a possibility that a call fails on fetching
# OAUTH2 token after 2 seconds (maybe because there is a
# slowdown in the system.) If such a case is observed, consider
# increasing the interval from 2 seconds to 5 seconds.
time.sleep(2)
channel_second_attempt = self.check_channel_successful_calls(
channel, **rpc_params
)
if (
channel_first_attempt.data.calls_started
!= channel_second_attempt.data.calls_started
):
raise self.NotFound(
f"[{self.hostname}] Not found successful calls over the channel."
)
logger.info( logger.info(
"[%s] Detected successful calls to xDS control plane %s," "[%s] Detected successful calls to xDS control plane %s,"
" channel: %s", " channel: %s",
@ -354,11 +326,17 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
xds_server_uri, xds_server_uri,
_ChannelzServiceClient.channel_repr(channel), _ChannelzServiceClient.channel_repr(channel),
) )
return channel_upd
except self.NotFound: except self.NotFound:
# Otherwise, keep searching. # Continue checking other channels to the same target on
# not found.
continue continue
except framework.rpc.grpc.RpcError as err:
return channel logger.debug(
f"Unexpected error while checking"
f" channel {channel.ref.channel_id}: {err}"
)
raise
raise self.ChannelNotActive( raise self.ChannelNotActive(
f"[{self.hostname}] Client has no" f"[{self.hostname}] Client has no"
@ -381,7 +359,7 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
expected_state_name: str = _ChannelzChannelState.Name(expected_state) expected_state_name: str = _ChannelzChannelState.Name(expected_state)
target: str = self.server_target target: str = self.server_target
for channel in self.get_server_channels(target, **rpc_params): for channel in self.find_channels(target, **rpc_params):
channel_state: _ChannelzChannelState = channel.data.state.state channel_state: _ChannelzChannelState = channel.data.state.state
logger.info( logger.info(
"[%s] Server channel: %s", "[%s] Server channel: %s",
@ -416,10 +394,12 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
expected_state=expected_state, expected_state=expected_state,
) )
def get_server_channels( def find_channels(
self, server_target: str, **kwargs self,
target: str,
**rpc_params,
) -> Iterable[_ChannelzChannel]: ) -> Iterable[_ChannelzChannel]:
return self.channelz.find_channels_for_target(server_target, **kwargs) return self.channelz.find_channels_for_target(target, **rpc_params)
def find_subchannel_with_state( def find_subchannel_with_state(
self, channel: _ChannelzChannel, state: _ChannelzChannelState, **kwargs self, channel: _ChannelzChannel, state: _ChannelzChannelState, **kwargs
@ -449,23 +429,65 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
subchannels.append(subchannel) subchannels.append(subchannel)
return subchannels return subchannels
def check_channel_successful_calls( def check_channel_in_flight_calls(
self, channel: _ChannelzChannel, **kwargs self,
) -> _ChannelzChannel: channel: _ChannelzChannel,
"""Checks if the channel has any successful calls. *,
wait_between_checks: Optional[_timedelta] = None,
We consider the channel is active if channel is in READY state and calls_started is **rpc_params,
greater than calls_failed. ) -> Optional[_ChannelzChannel]:
"""Checks if the channel has calls that started, but didn't complete.
We consider the channel is active if channel is in READY state and
calls_started is greater than calls_failed.
This method address race where a call to the xDS control plane server
has just started and a channelz request comes in before the call has
had a chance to fail.
With channels to the xDS control plane, the channel can be READY but the
calls could be failing to initialize, f.e. due to a failure to fetch
OAUTH2 token. To increase the confidence that we have a valid channel
with working OAUTH2 tokens, we check whether the channel is in a READY
state with active calls twice with an interval of 2 seconds between the
two attempts. If the OAUTH2 token is not valid, the call would fail and
be caught in either the first attempt, or the second attempt. It is
possible that between the two attempts, a call fails and a new call is
started, so we also test for equality between the started calls of the
two channelz results.
There still exists a possibility that a call fails on fetching OAUTH2
token after 2 seconds (maybe because there is a slowdown in the
system.) If such a case is observed, consider increasing the interval
from 2 seconds to 5 seconds.
Returns updated channel on success, or None on failure.
""" """
if not self.calc_calls_in_flight(channel):
return None
if not wait_between_checks:
wait_between_checks = _timedelta(seconds=2)
# Load the channel second time after the timeout.
time.sleep(wait_between_checks.total_seconds())
channel_upd: _ChannelzChannel = self.channelz.get_channel(
channel.ref.channel_id, **rpc_params
)
if ( if (
channel.data.state.state is _ChannelzChannelState.READY not self.calc_calls_in_flight(channel_upd)
and channel.data.calls_started > channel.data.calls_failed or channel.data.calls_started != channel_upd.data.calls_started
): ):
return channel return None
return channel_upd
raise self.NotFound( @classmethod
f"[{self.hostname}] Not found successful calls over the channel." def calc_calls_in_flight(cls, channel: _ChannelzChannel) -> int:
) cdata: _ChannelzChannelData = channel.data
if cdata.state.state is not _ChannelzChannelState.READY:
return 0
return cdata.calls_started - cdata.calls_succeeded - cdata.calls_failed
class ChannelNotFound(framework.rpc.grpc.GrpcApp.NotFound): class ChannelNotFound(framework.rpc.grpc.GrpcApp.NotFound):
"""Channel with expected status not found""" """Channel with expected status not found"""

Loading…
Cancel
Save