Refactor xds test driver to prepare for additional test cases

The bulk of this change is the introduction of a GcpResource object to
track the created VMs, backend services, URL maps, etc. The additional
test cases for xDS integration greatly increase the complexity of the
resources required (e.g., multiple backend services, instance groups in
different zones) and the GcpResource construct makes keeping track of -
and cleaning up - these resources much cleaner.
pull/22193/head
Eric Gribkoff 5 years ago
parent 5da8201d50
commit 1576de5d35
  1. 653
      tools/run_tests/run_xds_tests.py

@ -51,11 +51,15 @@ argp.add_argument('--project_id', help='GCP project id')
argp.add_argument(
'--gcp_suffix',
default='',
help='Optional suffix for all generated GCP resource names. Useful to ensure '
'distinct names across test runs.')
help='Optional suffix for all generated GCP resource names. Useful to '
'ensure distinct names across test runs.')
argp.add_argument('--test_case',
default=None,
choices=['all', 'ping_pong', 'round_robin'])
choices=[
'all',
'ping_pong',
'round_robin',
])
argp.add_argument(
'--client_cmd',
default=None,
@ -63,12 +67,15 @@ argp.add_argument(
'{service_host}, {service_port},{stats_port} and {qps} parameters using '
'str.format(), and generate the GRPC_XDS_BOOTSTRAP file.')
argp.add_argument('--zone', default='us-central1-a')
argp.add_argument('--secondary_zone',
default='us-west1-b',
help='Zone to use for secondary TD locality tests')
argp.add_argument('--qps', default=10, help='Client QPS')
argp.add_argument(
'--wait_for_backend_sec',
default=900,
help='Time limit for waiting for created backend services to report healthy '
'when launching test suite')
default=600,
help='Time limit for waiting for created backend services to report '
'healthy when launching or updated GCP resources')
argp.add_argument(
'--keep_gcp_resources',
default=False,
@ -81,13 +88,13 @@ argp.add_argument(
default=None,
type=str,
help=
'If provided, uses this file instead of retrieving via the GCP discovery API'
)
'If provided, uses this file instead of retrieving via the GCP discovery '
'API')
argp.add_argument('--network',
default='global/networks/default',
help='GCP network to use')
argp.add_argument('--service_port_range',
default='8080:8180',
default='8080:8100',
type=parse_port_range,
help='Listening port for created gRPC backends. Specified as '
'either a single int or as a range in the format min:max, in '
@ -115,35 +122,18 @@ argp.add_argument(
argp.add_argument('--verbose',
help='verbose log output',
default=False,
action="store_true")
action='store_true')
args = argp.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
PROJECT_ID = args.project_id
ZONE = args.zone
QPS = args.qps
TEST_CASE = args.test_case
CLIENT_CMD = args.client_cmd
WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
TEMPLATE_NAME = 'test-template' + args.gcp_suffix
INSTANCE_GROUP_NAME = 'test-ig' + args.gcp_suffix
HEALTH_CHECK_NAME = 'test-hc' + args.gcp_suffix
FIREWALL_RULE_NAME = 'test-fw-rule' + args.gcp_suffix
BACKEND_SERVICE_NAME = 'test-backend-service' + args.gcp_suffix
URL_MAP_NAME = 'test-map' + args.gcp_suffix
SERVICE_HOST = 'grpc-test' + args.gcp_suffix
TARGET_PROXY_NAME = 'test-target-proxy' + args.gcp_suffix
FORWARDING_RULE_NAME = 'test-forwarding-rule' + args.gcp_suffix
KEEP_GCP_RESOURCES = args.keep_gcp_resources
TOLERATE_GCP_ERRORS = args.tolerate_gcp_errors
STATS_PORT = args.stats_port
INSTANCE_GROUP_SIZE = 2
WAIT_FOR_OPERATION_SEC = 60
NUM_TEST_RPCS = 10 * QPS
WAIT_FOR_STATS_SEC = 30
BOOTSTRAP_TEMPLATE = """
_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
_WAIT_FOR_OPERATION_SEC = 60
_INSTANCE_GROUP_SIZE = 2
_NUM_TEST_RPCS = 10 * args.qps
_WAIT_FOR_STATS_SEC = 60
_BOOTSTRAP_TEMPLATE = """
{{
"node": {{
"id": "{node_id}"
@ -158,10 +148,20 @@ BOOTSTRAP_TEMPLATE = """
]
}}]
}}""" % args.xds_server
_PATH_MATCHER_NAME = 'path-matcher'
_BASE_TEMPLATE_NAME = 'test-template'
_BASE_INSTANCE_GROUP_NAME = 'test-ig'
_BASE_HEALTH_CHECK_NAME = 'test-hc'
_BASE_FIREWALL_RULE_NAME = 'test-fw-rule'
_BASE_BACKEND_SERVICE_NAME = 'test-backend-service'
_BASE_URL_MAP_NAME = 'test-map'
_BASE_SERVICE_HOST = 'grpc-test'
_BASE_TARGET_PROXY_NAME = 'test-target-proxy'
_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule'
def get_client_stats(num_rpcs, timeout_sec):
with grpc.insecure_channel('localhost:%d' % STATS_PORT) as channel:
with grpc.insecure_channel('localhost:%d' % args.stats_port) as channel:
stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
request = messages_pb2.LoadBalancerStatsRequest()
request.num_rpcs = num_rpcs
@ -177,12 +177,17 @@ def get_client_stats(num_rpcs, timeout_sec):
raise Exception('GetClientStats RPC failed')
def wait_until_only_given_backends_receive_load(backends, timeout_sec):
def wait_until_only_given_instances_receive_load(backends,
timeout_sec,
num_rpcs=100,
allow_failures=False):
start_time = time.time()
error_msg = None
logger.debug('Waiting for %d sec until backends %s receive load' %
(timeout_sec, backends))
while time.time() - start_time <= timeout_sec:
error_msg = None
stats = get_client_stats(max(len(backends), 1), timeout_sec)
stats = get_client_stats(num_rpcs, timeout_sec)
rpcs_by_peer = stats.rpcs_by_peer
for backend in backends:
if backend not in rpcs_by_peer:
@ -190,52 +195,59 @@ def wait_until_only_given_backends_receive_load(backends, timeout_sec):
break
if not error_msg and len(rpcs_by_peer) > len(backends):
error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer
if not allow_failures and stats.num_failures > 0:
error_msg = '%d RPCs failed' % stats.num_failures
if not error_msg:
return
raise Exception(error_msg)
def test_ping_pong(backends, num_rpcs, stats_timeout_sec):
def test_ping_pong(gcp, backend_service, instance_group):
wait_for_healthy_backends(gcp, backend_service, instance_group)
instance_names = get_instance_names(gcp, instance_group)
start_time = time.time()
error_msg = None
while time.time() - start_time <= stats_timeout_sec:
while time.time() - start_time <= _WAIT_FOR_STATS_SEC:
error_msg = None
stats = get_client_stats(num_rpcs, stats_timeout_sec)
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
rpcs_by_peer = stats.rpcs_by_peer
for backend in backends:
if backend not in rpcs_by_peer:
error_msg = 'Backend %s did not receive load' % backend
for instance in instance_names:
if instance not in rpcs_by_peer:
error_msg = 'Instance %s did not receive load' % instance
break
if not error_msg and len(rpcs_by_peer) > len(backends):
error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer
if not error_msg and len(rpcs_by_peer) > len(instance_names):
error_msg = 'Unexpected instance received load: %s' % rpcs_by_peer
if not error_msg:
return
raise Exception(error_msg)
def test_round_robin(backends, num_rpcs, stats_timeout_sec):
def test_round_robin(gcp, backend_service, instance_group):
wait_for_healthy_backends(gcp, backend_service, instance_group)
instance_names = get_instance_names(gcp, instance_group)
threshold = 1
wait_until_only_given_backends_receive_load(backends, stats_timeout_sec)
stats = get_client_stats(num_rpcs, stats_timeout_sec)
wait_until_only_given_instances_receive_load(instance_names,
_WAIT_FOR_STATS_SEC)
stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
total_requests_received = sum(
[stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer])
if total_requests_received != num_rpcs:
if total_requests_received != _NUM_TEST_RPCS:
raise Exception('Unexpected RPC failures', stats)
expected_requests = total_requests_received / len(backends)
for backend in backends:
if abs(stats.rpcs_by_peer[backend] - expected_requests) > threshold:
expected_requests = total_requests_received / len(instance_names)
for instance in instance_names:
if abs(stats.rpcs_by_peer[instance] - expected_requests) > threshold:
raise Exception(
'RPC peer distribution differs from expected by more than %d for backend %s (%s)',
threshold, backend, stats)
'RPC peer distribution differs from expected by more than %d '
'for instance %s (%s)', threshold, instance, stats)
def create_instance_template(compute, project, name, grpc_port):
def create_instance_template(gcp, name, network, source_image):
config = {
'name': name,
'properties': {
'tags': {
'items': ['grpc-allow-healthcheck']
'items': ['allow-health-checks']
},
'machineType': 'e2-standard-2',
'serviceAccounts': [{
@ -246,12 +258,12 @@ def create_instance_template(compute, project, name, grpc_port):
'accessConfigs': [{
'type': 'ONE_TO_ONE_NAT'
}],
'network': args.network
'network': network
}],
'disks': [{
'boot': True,
'initializeParams': {
'sourceImage': args.source_image
'sourceImage': source_image
}
}],
'metadata': {
@ -260,7 +272,6 @@ def create_instance_template(compute, project, name, grpc_port):
'startup-script',
'value':
"""#!/bin/bash
sudo apt update
sudo apt install -y git default-jdk
mkdir java_server
@ -271,40 +282,43 @@ pushd interop-testing
../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true
nohup build/install/grpc-interop-testing/bin/xds-test-server --port=%d 1>/dev/null &"""
% grpc_port
% gcp.service_port
}]
}
}
}
result = compute.instanceTemplates().insert(project=project,
body=config).execute()
wait_for_global_operation(compute, project, result['name'])
return result['targetLink']
result = gcp.compute.instanceTemplates().insert(project=gcp.project,
body=config).execute()
wait_for_global_operation(gcp, result['name'])
gcp.instance_template = GcpResource(config['name'], result['targetLink'])
def create_instance_group(compute, project, zone, name, size, grpc_port,
template_url):
def add_instance_group(gcp, zone, name, size):
config = {
'name': name,
'instanceTemplate': template_url,
'instanceTemplate': gcp.instance_template.url,
'targetSize': size,
'namedPorts': [{
'name': 'grpc',
'port': grpc_port
'port': gcp.service_port
}]
}
result = compute.instanceGroupManagers().insert(project=project,
zone=zone,
body=config).execute()
wait_for_zone_operation(compute, project, zone, result['name'])
result = compute.instanceGroupManagers().get(
project=PROJECT_ID, zone=ZONE, instanceGroupManager=name).execute()
return result['instanceGroup']
result = gcp.compute.instanceGroupManagers().insert(project=gcp.project,
zone=zone,
body=config).execute()
wait_for_zone_operation(gcp, zone, result['name'])
result = gcp.compute.instanceGroupManagers().get(
project=gcp.project, zone=zone,
instanceGroupManager=config['name']).execute()
instance_group = InstanceGroup(config['name'], result['instanceGroup'],
zone)
gcp.instance_groups.append(instance_group)
return instance_group
def create_health_check(compute, project, name):
def create_health_check(gcp, name):
config = {
'name': name,
'type': 'TCP',
@ -312,13 +326,13 @@ def create_health_check(compute, project, name):
'portName': 'grpc'
}
}
result = compute.healthChecks().insert(project=project,
body=config).execute()
wait_for_global_operation(compute, project, result['name'])
return result['targetLink']
result = gcp.compute.healthChecks().insert(project=gcp.project,
body=config).execute()
wait_for_global_operation(gcp, result['name'])
gcp.health_check = GcpResource(config['name'], result['targetLink'])
def create_health_check_firewall_rule(compute, project, name):
def create_health_check_firewall_rule(gcp, name):
config = {
'name': name,
'direction': 'INGRESS',
@ -326,169 +340,169 @@ def create_health_check_firewall_rule(compute, project, name):
'IPProtocol': 'tcp'
}],
'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'],
'targetTags': ['grpc-allow-healthcheck'],
'targetTags': ['allow-health-checks'],
}
result = compute.firewalls().insert(project=project, body=config).execute()
wait_for_global_operation(compute, project, result['name'])
result = gcp.compute.firewalls().insert(project=gcp.project,
body=config).execute()
wait_for_global_operation(gcp, result['name'])
gcp.health_check_firewall_rule = GcpResource(config['name'],
result['targetLink'])
def create_backend_service(compute, project, name, health_check):
def add_backend_service(gcp, name):
config = {
'name': name,
'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
'healthChecks': [health_check],
'healthChecks': [gcp.health_check.url],
'portName': 'grpc',
'protocol': 'HTTP2'
}
result = compute.backendServices().insert(project=project,
body=config).execute()
wait_for_global_operation(compute, project, result['name'])
return result['targetLink']
result = gcp.compute.backendServices().insert(project=gcp.project,
body=config).execute()
wait_for_global_operation(gcp, result['name'])
backend_service = GcpResource(config['name'], result['targetLink'])
gcp.backend_services.append(backend_service)
return backend_service
def create_url_map(compute, project, name, backend_service_url, host_name):
path_matcher_name = 'path-matcher'
def create_url_map(gcp, name, backend_service, host_name):
config = {
'name': name,
'defaultService': backend_service_url,
'defaultService': backend_service.url,
'pathMatchers': [{
'name': path_matcher_name,
'defaultService': backend_service_url,
'name': _PATH_MATCHER_NAME,
'defaultService': backend_service.url,
}],
'hostRules': [{
'hosts': [host_name],
'pathMatcher': path_matcher_name
'pathMatcher': _PATH_MATCHER_NAME
}]
}
result = compute.urlMaps().insert(project=project, body=config).execute()
wait_for_global_operation(compute, project, result['name'])
return result['targetLink']
result = gcp.compute.urlMaps().insert(project=gcp.project,
body=config).execute()
wait_for_global_operation(gcp, result['name'])
gcp.url_map = GcpResource(config['name'], result['targetLink'])
def create_target_http_proxy(compute, project, name, url_map_url):
def create_target_http_proxy(gcp, name):
config = {
'name': name,
'url_map': url_map_url,
'url_map': gcp.url_map.url,
}
result = compute.targetHttpProxies().insert(project=project,
body=config).execute()
wait_for_global_operation(compute, project, result['name'])
return result['targetLink']
result = gcp.compute.targetHttpProxies().insert(project=gcp.project,
body=config).execute()
wait_for_global_operation(gcp, result['name'])
gcp.target_http_proxy = GcpResource(config['name'], result['targetLink'])
def create_global_forwarding_rule(compute, project, name, grpc_port,
target_http_proxy_url):
def create_global_forwarding_rule(gcp, name, port):
config = {
'name': name,
'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
'portRange': str(grpc_port),
'portRange': str(port),
'IPAddress': '0.0.0.0',
'network': args.network,
'target': target_http_proxy_url,
'target': gcp.target_http_proxy.url,
}
result = compute.globalForwardingRules().insert(project=project,
body=config).execute()
wait_for_global_operation(compute, project, result['name'])
def delete_global_forwarding_rule(compute, project, forwarding_rule):
try:
result = compute.globalForwardingRules().delete(
project=project, forwardingRule=forwarding_rule).execute()
wait_for_global_operation(compute, project, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
result = gcp.compute.globalForwardingRules().insert(project=gcp.project,
body=config).execute()
wait_for_global_operation(gcp, result['name'])
gcp.global_forwarding_rule = GcpResource(config['name'],
result['targetLink'])
def delete_target_http_proxy(compute, project, target_http_proxy):
def delete_global_forwarding_rule(gcp):
try:
result = compute.targetHttpProxies().delete(
project=project, targetHttpProxy=target_http_proxy).execute()
wait_for_global_operation(compute, project, result['name'])
result = gcp.compute.globalForwardingRules().delete(
project=gcp.project,
forwardingRule=gcp.global_forwarding_rule.name).execute()
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def delete_url_map(compute, project, url_map):
def delete_target_http_proxy(gcp):
try:
result = compute.urlMaps().delete(project=project,
urlMap=url_map).execute()
wait_for_global_operation(compute, project, result['name'])
result = gcp.compute.targetHttpProxies().delete(
project=gcp.project,
targetHttpProxy=gcp.target_http_proxy.name).execute()
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def delete_backend_service(compute, project, backend_service):
def delete_url_map(gcp):
try:
result = compute.backendServices().delete(
project=project, backendService=backend_service).execute()
wait_for_global_operation(compute, project, result['name'])
result = gcp.compute.urlMaps().delete(
project=gcp.project, urlMap=gcp.url_map.name).execute()
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def delete_firewall(compute, project, firewall_rule):
try:
result = compute.firewalls().delete(project=project,
firewall=firewall_rule).execute()
wait_for_global_operation(compute, project, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def delete_backend_services(gcp):
for backend_service in gcp.backend_services:
try:
result = gcp.compute.backendServices().delete(
project=gcp.project,
backendService=gcp.backend_service.name).execute()
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def delete_health_check(compute, project, health_check):
def delete_firewall(gcp):
try:
result = compute.healthChecks().delete(
project=project, healthCheck=health_check).execute()
wait_for_global_operation(compute, project, result['name'])
result = gcp.compute.firewalls().delete(
project=gcp.project,
firewall=gcp.health_check_firewall_rule.name).execute()
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def delete_instance_group(compute, project, zone, instance_group):
def delete_health_check(gcp):
try:
result = compute.instanceGroupManagers().delete(
project=project, zone=zone,
instanceGroupManager=instance_group).execute()
timeout_sec = 180 # Deleting an instance group can be slow
wait_for_zone_operation(compute,
project,
ZONE,
result['name'],
timeout_sec=timeout_sec)
result = gcp.compute.healthChecks().delete(
project=gcp.project, healthCheck=gcp.health_check.name).execute()
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def delete_instance_template(compute, project, instance_template):
def delete_instance_groups(gcp):
for instance_group in gcp.instance_groups:
try:
result = gcp.compute.instanceGroupManagers().delete(
project=gcp.project,
zone=instance_group.zone,
instanceGroupManager=instance_group.name).execute()
timeout_sec = 180 # Deleting an instance group can be slow
wait_for_zone_operation(gcp,
instance_group.zone,
result['name'],
timeout_sec=timeout_sec)
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def delete_instance_template(gcp):
try:
result = compute.instanceTemplates().delete(
project=project, instanceTemplate=instance_template).execute()
wait_for_global_operation(compute, project, result['name'])
result = gcp.compute.instanceTemplates().delete(
project=gcp.project,
instanceTemplate=gcp.instance_template.name).execute()
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
def add_instances_to_backend(compute, project, backend_service, instance_group):
config = {
'backends': [{
'group': instance_group,
}],
}
result = compute.backendServices().patch(project=project,
backendService=backend_service,
body=config).execute()
wait_for_global_operation(compute, project, result['name'])
def wait_for_global_operation(compute,
project,
def wait_for_global_operation(gcp,
operation,
timeout_sec=WAIT_FOR_OPERATION_SEC):
timeout_sec=_WAIT_FOR_OPERATION_SEC):
start_time = time.time()
while time.time() - start_time <= timeout_sec:
result = compute.globalOperations().get(project=project,
operation=operation).execute()
result = gcp.compute.globalOperations().get(
project=gcp.project, operation=operation).execute()
if result['status'] == 'DONE':
if 'error' in result:
raise Exception(result['error'])
@ -498,16 +512,14 @@ def wait_for_global_operation(compute,
timeout_sec)
def wait_for_zone_operation(compute,
project,
def wait_for_zone_operation(gcp,
zone,
operation,
timeout_sec=WAIT_FOR_OPERATION_SEC):
timeout_sec=_WAIT_FOR_OPERATION_SEC):
start_time = time.time()
while time.time() - start_time <= timeout_sec:
result = compute.zoneOperations().get(project=project,
zone=zone,
operation=operation).execute()
result = gcp.compute.zoneOperations().get(
project=gcp.project, zone=zone, operation=operation).execute()
if result['status'] == 'DONE':
if 'error' in result:
raise Exception(result['error'])
@ -517,13 +529,16 @@ def wait_for_zone_operation(compute,
timeout_sec)
def wait_for_healthy_backends(compute, project_id, backend_service,
instance_group_url, timeout_sec):
def wait_for_healthy_backends(gcp,
backend_service,
instance_group,
timeout_sec=_WAIT_FOR_BACKEND_SEC):
start_time = time.time()
config = {'group': instance_group_url}
config = {'group': instance_group.url}
while time.time() - start_time <= timeout_sec:
result = compute.backendServices().getHealth(
project=project_id, backendService=backend_service,
result = gcp.compute.backendServices().getHealth(
project=gcp.project,
backendService=backend_service.name,
body=config).execute()
if 'healthStatus' in result:
healthy = True
@ -538,15 +553,32 @@ def wait_for_healthy_backends(compute, project_id, backend_service,
(timeout_sec, result))
def start_xds_client(service_port):
cmd = CLIENT_CMD.format(service_host=SERVICE_HOST,
service_port=service_port,
stats_port=STATS_PORT,
qps=QPS)
def get_instance_names(gcp, instance_group):
instance_names = []
result = gcp.compute.instanceGroups().listInstances(
project=gcp.project,
zone=instance_group.zone,
instanceGroup=instance_group.name,
body={
'instanceState': 'ALL'
}).execute()
if 'items' not in result:
return []
for item in result['items']:
# listInstances() returns the full URL of the instance, which ends with
# the instance name. compute.instances().get() requires using the
# instance name (not the full URL) to look up instance details, so we
# just extract the name manually.
instance_name = item['instance'].split('/')[-1]
instance_names.append(instance_name)
return instance_names
def start_xds_client(cmd, service_port):
bootstrap_path = None
with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
bootstrap_file.write(
BOOTSTRAP_TEMPLATE.format(
_BOOTSTRAP_TEMPLATE.format(
node_id=socket.gethostname()).encode('utf-8'))
bootstrap_path = bootstrap_file.name
@ -557,6 +589,53 @@ def start_xds_client(service_port):
return client_process
class InstanceGroup(object):
def __init__(self, name, url, zone):
self.name = name
self.url = url
self.zone = zone
class GcpResource(object):
def __init__(self, name, url):
self.name = name
self.url = url
class GcpState(object):
def __init__(self, compute, project):
self.compute = compute
self.project = project
self.health_check = None
self.health_check_firewall_rule = None
self.backend_services = []
self.url_map = None
self.target_http_proxy = None
self.global_forwarding_rule = None
self.service_port = None
self.instance_template = None
self.instance_groups = []
def clean_up(self):
if self.global_forwarding_rule:
delete_global_forwarding_rule(self)
if self.target_http_proxy:
delete_target_http_proxy(self)
if self.url_map:
delete_url_map(self)
delete_backend_services(self)
if self.health_check_firewall_rule:
delete_firewall(self)
if self.health_check:
delete_health_check(self)
delete_instance_groups(self)
if self.instance_template:
delete_instance_template(self)
if args.compute_discovery_document:
with open(args.compute_discovery_document, 'r') as discovery_doc:
compute = googleapiclient.discovery.build_from_document(
@ -564,107 +643,141 @@ if args.compute_discovery_document:
else:
compute = googleapiclient.discovery.build('compute', 'v1')
service_port = None
client_process = None
try:
instance_group_url = None
gcp = GcpState(compute, args.project_id)
health_check_name = _BASE_HEALTH_CHECK_NAME + args.gcp_suffix
firewall_name = _BASE_FIREWALL_RULE_NAME + args.gcp_suffix
backend_service_name = _BASE_BACKEND_SERVICE_NAME + args.gcp_suffix
alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + args.gcp_suffix
url_map_name = _BASE_URL_MAP_NAME + args.gcp_suffix
service_host_name = _BASE_SERVICE_HOST + args.gcp_suffix
target_http_proxy_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix
forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + args.gcp_suffix
template_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix
instance_group_name = _BASE_INSTANCE_GROUP_NAME + args.gcp_suffix
same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + args.gcp_suffix
secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + args.gcp_suffix
try:
health_check_url = create_health_check(compute, PROJECT_ID,
HEALTH_CHECK_NAME)
create_health_check_firewall_rule(compute, PROJECT_ID,
FIREWALL_RULE_NAME)
backend_service_url = create_backend_service(compute, PROJECT_ID,
BACKEND_SERVICE_NAME,
health_check_url)
url_map_url = create_url_map(compute, PROJECT_ID, URL_MAP_NAME,
backend_service_url, SERVICE_HOST)
target_http_proxy_url = create_target_http_proxy(
compute, PROJECT_ID, TARGET_PROXY_NAME, url_map_url)
create_health_check(gcp, health_check_name)
create_health_check_firewall_rule(gcp, firewall_name)
backend_service = add_backend_service(gcp, backend_service_name)
alternate_backend_service = add_backend_service(
gcp, alternate_backend_service_name)
create_url_map(gcp, url_map_name, gcp.backend_services[0],
service_host_name)
create_target_http_proxy(gcp, target_http_proxy_name)
potential_service_ports = list(args.service_port_range)
random.shuffle(potential_service_ports)
for port in potential_service_ports:
try:
create_global_forwarding_rule(
compute,
PROJECT_ID,
FORWARDING_RULE_NAME,
port,
target_http_proxy_url,
)
service_port = port
create_global_forwarding_rule(gcp, forwarding_rule_name, port)
gcp.service_port = port
break
except googleapiclient.errors.HttpError as http_error:
logger.warning(
'Got error %s when attempting to create forwarding rule to port %d. Retrying with another port.'
% (http_error, port))
if not service_port:
'Got error %s when attempting to create forwarding rule to '
'port %d. Retrying with another port.' % (http_error, port))
if not gcp.service_port:
raise Exception('Failed to pick a service port in the range %s' %
args.service_port_range)
template_url = create_instance_template(compute, PROJECT_ID,
TEMPLATE_NAME, service_port)
instance_group_url = create_instance_group(compute, PROJECT_ID, ZONE,
INSTANCE_GROUP_NAME,
INSTANCE_GROUP_SIZE,
service_port, template_url)
add_instances_to_backend(compute, PROJECT_ID, BACKEND_SERVICE_NAME,
instance_group_url)
create_instance_template(gcp, template_name, args.network,
args.source_image)
instance_group = add_instance_group(gcp, args.zone, instance_group_name,
_INSTANCE_GROUP_SIZE)
patch_backend_instances(gcp, backend_service, [instance_group])
same_zone_instance_group = add_instance_group(
gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE)
secondary_zone_instance_group = add_instance_group(
gcp, args.secondary_zone, secondary_zone_instance_group_name,
_INSTANCE_GROUP_SIZE)
except googleapiclient.errors.HttpError as http_error:
if TOLERATE_GCP_ERRORS:
if args.tolerate_gcp_errors:
logger.warning(
'Failed to set up backends: %s. Continuing since '
'Failed to set up backends: %s. Attempting to continue since '
'--tolerate_gcp_errors=true', http_error)
if not gcp.instance_template:
result = compute.instanceTemplates().get(
project=args.project_id,
instanceTemplate=template_name).execute()
gcp.instance_template = GcpResource(template_name,
result['selfLink'])
if not gcp.backend_services:
result = compute.backendServices().get(
project=args.project_id,
backendService=backend_service_name).execute()
backend_service = GcpResource(backend_service_name,
result['selfLink'])
gcp.backend_services.append(backend_service)
result = compute.backendServices().get(
project=args.project_id,
backendService=alternate_backend_service_name).execute()
alternate_backend_service = GcpResource(
alternate_backend_service_name, result['selfLink'])
gcp.backend_services.append(alternate_backend_service)
if not gcp.instance_groups:
result = compute.instanceGroups().get(
project=args.project_id,
zone=args.zone,
instanceGroup=instance_group_name).execute()
instance_group = InstanceGroup(instance_group_name,
result['selfLink'], args.zone)
gcp.instance_groups.append(instance_group)
result = compute.instanceGroups().get(
project=args.project_id,
zone=args.zone,
instanceGroup=same_zone_instance_group_name).execute()
same_zone_instance_group = InstanceGroup(
same_zone_instance_group_name, result['selfLink'],
args.zone)
gcp.instance_groups.append(same_zone_instance_group)
result = compute.instanceGroups().get(
project=args.project_id,
zone=args.secondary_zone,
instanceGroup=secondary_zone_instance_group_name).execute()
secondary_zone_instance_group = InstanceGroup(
secondary_zone_instance_group_name, result['selfLink'],
args.secondary_zone)
gcp.instance_groups.append(secondary_zone_instance_group)
if not gcp.health_check:
result = compute.healthChecks().get(
project=args.project_id,
healthCheck=health_check_name).execute()
gcp.health_check = GcpResource(health_check_name,
result['selfLink'])
if not gcp.url_map:
result = compute.urlMaps().get(project=args.project_id,
urlMap=url_map_name).execute()
gcp.url_map = GcpResource(url_map_name, result['selfLink'])
if not gcp.service_port:
gcp.service_port = args.service_port_range[0]
logger.warning('Using arbitrary service port in range: %d' %
gcp.service_port)
else:
raise http_error
if instance_group_url is None:
# Look up the instance group URL, which may be unset if we are running
# with --tolerate_gcp_errors=true.
result = compute.instanceGroups().get(
project=PROJECT_ID, zone=ZONE,
instanceGroup=INSTANCE_GROUP_NAME).execute()
instance_group_url = result['selfLink']
wait_for_healthy_backends(compute, PROJECT_ID, BACKEND_SERVICE_NAME,
instance_group_url, WAIT_FOR_BACKEND_SEC)
backends = []
result = compute.instanceGroups().listInstances(
project=PROJECT_ID,
zone=ZONE,
instanceGroup=INSTANCE_GROUP_NAME,
body={
'instanceState': 'ALL'
}).execute()
for item in result['items']:
# listInstances() returns the full URL of the instance, which ends with
# the instance name. compute.instances().get() requires using the
# instance name (not the full URL) to look up instance details, so we
# just extract the name manually.
instance_name = item['instance'].split('/')[-1]
backends.append(instance_name)
client_process = start_xds_client(service_port)
if TEST_CASE == 'all':
test_ping_pong(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC)
test_round_robin(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC)
elif TEST_CASE == 'ping_pong':
test_ping_pong(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC)
elif TEST_CASE == 'round_robin':
test_round_robin(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC)
wait_for_healthy_backends(gcp, backend_service, instance_group)
cmd = args.client_cmd.format(service_host=service_host_name,
service_port=gcp.service_port,
stats_port=args.stats_port,
qps=args.qps)
client_process = start_xds_client(cmd, gcp.service_port)
if args.test_case == 'all':
test_ping_pong(gcp, backend_service, instance_group)
test_round_robin(gcp, backend_service, instance_group)
elif args.test_case == 'ping_pong':
test_ping_pong(gcp, backend_service, instance_group)
elif args.test_case == 'round_robin':
test_round_robin(gcp, backend_service, instance_group)
else:
logger.error('Unknown test case: %s', TEST_CASE)
logger.error('Unknown test case: %s', args.test_case)
sys.exit(1)
finally:
if client_process:
client_process.terminate()
if not KEEP_GCP_RESOURCES:
if not args.keep_gcp_resources:
logger.info('Cleaning up GCP resources. This may take some time.')
delete_global_forwarding_rule(compute, PROJECT_ID, FORWARDING_RULE_NAME)
delete_target_http_proxy(compute, PROJECT_ID, TARGET_PROXY_NAME)
delete_url_map(compute, PROJECT_ID, URL_MAP_NAME)
delete_backend_service(compute, PROJECT_ID, BACKEND_SERVICE_NAME)
delete_firewall(compute, PROJECT_ID, FIREWALL_RULE_NAME)
delete_health_check(compute, PROJECT_ID, HEALTH_CHECK_NAME)
delete_instance_group(compute, PROJECT_ID, ZONE, INSTANCE_GROUP_NAME)
delete_instance_template(compute, PROJECT_ID, TEMPLATE_NAME)
gcp.clean_up()

Loading…
Cancel
Save