@ -15,6 +15,7 @@
""" Run xDS integration tests on GCP using Traffic Director. """
import argparse
import datetime
import googleapiclient . discovery
import grpc
import json
@ -41,6 +42,15 @@ from src.proto.grpc.testing import empty_pb2
from src . proto . grpc . testing import messages_pb2
from src . proto . grpc . testing import test_pb2_grpc
# Envoy protos provided by PyPI package xds-protos
# Needs to import the generated Python file to load descriptors
from envoy . service . status . v3 import csds_pb2
from envoy . service . status . v3 import csds_pb2_grpc
from envoy . extensions . filters . network . http_connection_manager . v3 import http_connection_manager_pb2
from envoy . extensions . filters . common . fault . v3 import fault_pb2
from envoy . extensions . filters . http . fault . v3 import fault_pb2
from envoy . extensions . filters . http . router . v3 import router_pb2
logger = logging . getLogger ( )
console_handler = logging . StreamHandler ( )
formatter = logging . Formatter ( fmt = ' %(asctime)s : %(levelname)-8s %(message)s ' )
@ -79,10 +89,11 @@ _ADDITIONAL_TEST_CASES = [
' circuit_breaking ' ,
' timeout ' ,
' fault_injection ' ,
' csds ' ,
]
# Test cases that require the V3 API. Skipped in older runs.
_V3_TEST_CASES = frozenset ( [ ' timeout ' , ' fault_injection ' ] )
_V3_TEST_CASES = frozenset ( [ ' timeout ' , ' fault_injection ' , ' csds ' ] )
# Test cases that require the alpha API. Skipped for stable API runs.
_ALPHA_TEST_CASES = frozenset ( [ ' timeout ' ] )
@ -363,6 +374,32 @@ def get_client_accumulated_stats():
return response
def get_client_xds_config_dump ( ) :
if CLIENT_HOSTS :
hosts = CLIENT_HOSTS
else :
hosts = [ ' localhost ' ]
for host in hosts :
server_address = ' %s : %d ' % ( host , args . stats_port )
with grpc . insecure_channel ( server_address ) as channel :
stub = csds_pb2_grpc . ClientStatusDiscoveryServiceStub ( channel )
logger . debug ( ' Fetching xDS config dump from %s ' , server_address )
response = stub . FetchClientStatus ( csds_pb2 . ClientStatusRequest ( ) ,
wait_for_ready = True ,
timeout = _CONNECTION_TIMEOUT_SEC )
logger . debug ( ' Fetched xDS config dump from %s ' , server_address )
if len ( response . config ) != 1 :
logger . error ( ' Unexpected number of ClientConfigs %d : %s ' ,
len ( response . config ) , response )
return None
else :
# Converting the ClientStatusResponse into JSON, because many
# fields are packed in google.protobuf.Any. It will require many
# duplicated code to unpack proto message and inspect values.
return json_format . MessageToDict (
response . config [ 0 ] , preserving_proto_field_name = True )
def configure_client ( rpc_types , metadata = [ ] , timeout_sec = None ) :
if CLIENT_HOSTS :
hosts = CLIENT_HOSTS
@ -1782,6 +1819,92 @@ def test_fault_injection(gcp, original_backend_service, instance_group):
set_validate_for_proxyless ( gcp , True )
def test_csds ( gcp , original_backend_service , instance_group , server_uri ) :
test_csds_timeout_s = datetime . timedelta ( minutes = 5 ) . total_seconds ( )
sleep_interval_between_attempts_s = datetime . timedelta (
seconds = 2 ) . total_seconds ( )
logger . info ( ' Running test_csds ' )
logger . info ( ' waiting for original backends to become healthy ' )
wait_for_healthy_backends ( gcp , original_backend_service , instance_group )
# Test case timeout: 5 minutes
deadline = time . time ( ) + test_csds_timeout_s
cnt = 0
while time . time ( ) < = deadline :
client_config = get_client_xds_config_dump ( )
logger . info ( ' test_csds attempt %d : received xDS config %s ' , cnt ,
json . dumps ( client_config , indent = 2 ) )
if client_config is not None :
# Got the xDS config dump, now validate it
ok = True
try :
if client_config [ ' node ' ] [ ' locality ' ] [ ' zone ' ] != args . zone :
logger . info ( ' Invalid zone %s != %s ' ,
client_config [ ' node ' ] [ ' locality ' ] [ ' zone ' ] ,
args . zone )
ok = False
seen = set ( )
for xds_config in client_config [ ' xds_config ' ] :
if ' listener_config ' in xds_config :
listener_name = xds_config [ ' listener_config ' ] [
' dynamic_listeners ' ] [ 0 ] [ ' active_state ' ] [ ' listener ' ] [
' name ' ]
if listener_name != server_uri :
logger . info ( ' Invalid Listener name %s != %s ' ,
listener_name , server_uri )
ok = False
else :
seen . add ( ' lds ' )
elif ' route_config ' in xds_config :
num_vh = len (
xds_config [ ' route_config ' ] [ ' dynamic_route_configs ' ]
[ 0 ] [ ' route_config ' ] [ ' virtual_hosts ' ] )
if num_vh < = 0 :
logger . info ( ' Invalid number of VirtualHosts %s ' ,
num_vh )
ok = False
else :
seen . add ( ' rds ' )
elif ' cluster_config ' in xds_config :
cluster_type = xds_config [ ' cluster_config ' ] [
' dynamic_active_clusters ' ] [ 0 ] [ ' cluster ' ] [ ' type ' ]
if cluster_type != ' EDS ' :
logger . info ( ' Invalid cluster type %s != EDS ' ,
cluster_type )
ok = False
else :
seen . add ( ' cds ' )
elif ' endpoint_config ' in xds_config :
sub_zone = xds_config [ " endpoint_config " ] [
" dynamic_endpoint_configs " ] [ 0 ] [ " endpoint_config " ] [
" endpoints " ] [ 0 ] [ " locality " ] [ " sub_zone " ]
if args . zone not in sub_zone :
logger . info ( ' Invalid endpoint sub_zone %s ' ,
sub_zone )
ok = False
else :
seen . add ( ' eds ' )
want = { ' lds ' , ' rds ' , ' cds ' , ' eds ' }
if seen == want :
logger . info ( ' Incomplete xDS config dump, seen= %s ' , seen )
ok = False
except :
logger . exception ( ' Error in xDS config dump: ' )
ok = False
finally :
if ok :
# Successfully fetched xDS config, and they looks good.
logger . info ( ' success ' )
return
logger . info ( ' test_csds attempt %d failed ' , cnt )
# Give the client some time to fetch xDS resources
time . sleep ( sleep_interval_between_attempts_s )
cnt + = 1
raise RuntimeError ( ' failed to receive valid xDS config ' )
def set_validate_for_proxyless ( gcp , validate_for_proxyless ) :
if not gcp . alpha_compute :
logger . debug (
@ -1838,7 +1961,7 @@ def is_primary_instance_group(gcp, instance_group):
def get_startup_script ( path_to_server_binary , service_port ) :
if path_to_server_binary :
return " nohup %s --port= %d 1>/dev/null & " % ( path_to_server_binary ,
return ' nohup %s --port= %d 1>/dev/null & ' % ( path_to_server_binary ,
service_port )
else :
return """ #!/bin/bash
@ -2737,6 +2860,8 @@ try:
test_timeout ( gcp , backend_service , instance_group )
elif test_case == ' fault_injection ' :
test_fault_injection ( gcp , backend_service , instance_group )
elif test_case == ' csds ' :
test_csds ( gcp , backend_service , instance_group , server_uri )
else :
logger . error ( ' Unknown test case: %s ' , test_case )
sys . exit ( 1 )