xds-k8s driver: switch Backend Health Check from TCP to GRPC

pull/25409/head
Sergii Tkachenko 4 years ago
parent 7cbc96b274
commit 21871dcb88
  1. 31
      tools/run_tests/xds_k8s_test_driver/bin/run_td_setup.py
  2. 6
      tools/run_tests/xds_k8s_test_driver/bin/run_test_server.py
  3. 32
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py
  4. 33
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  5. 11
      tools/run_tests/xds_k8s_test_driver/framework/test_app/server_app.py
  6. 9
      tools/run_tests/xds_k8s_test_driver/framework/xds_flags.py
  7. 37
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py

@ -41,6 +41,7 @@ from framework import xds_k8s_flags
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.infrastructure import traffic_director
from framework.test_app import server_app
logger = logging.getLogger(__name__)
# Flags
@ -61,6 +62,9 @@ _SECURITY = flags.DEFINE_enum('security',
flags.adopt_module_key_flags(xds_flags)
flags.adopt_module_key_flags(xds_k8s_flags)
_DEFAULT_SECURE_MODE_MAINTENANCE_PORT = \
server_app.KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
def main(argv):
if len(argv) > 1:
@ -76,6 +80,7 @@ def main(argv):
# Test server
server_name = xds_flags.SERVER_NAME.value
server_port = xds_flags.SERVER_PORT.value
server_maintenance_port = xds_flags.SERVER_MAINTENANCE_PORT.value
server_xds_host = xds_flags.SERVER_XDS_HOST.value
server_xds_port = xds_flags.SERVER_XDS_PORT.value
@ -92,17 +97,23 @@ def main(argv):
project=project,
resource_prefix=namespace,
network=network)
if server_maintenance_port is None:
server_maintenance_port = _DEFAULT_SECURE_MODE_MAINTENANCE_PORT
try:
if command in ('create', 'cycle'):
logger.info('Create mode')
if security_mode is None:
logger.info('No security')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
elif security_mode == 'mtls':
logger.info('Setting up mtls')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
td.setup_server_security(server_namespace=namespace,
server_name=server_name,
server_port=server_port,
@ -115,7 +126,9 @@ def main(argv):
elif security_mode == 'tls':
logger.info('Setting up tls')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
td.setup_server_security(server_namespace=namespace,
server_name=server_name,
server_port=server_port,
@ -128,7 +141,9 @@ def main(argv):
elif security_mode == 'plaintext':
logger.info('Setting up plaintext')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
td.setup_server_security(server_namespace=namespace,
server_name=server_name,
server_port=server_port,
@ -143,7 +158,9 @@ def main(argv):
# Error case: server expects client mTLS cert,
# but client configured only for TLS
logger.info('Setting up mtls_error')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
td.setup_server_security(server_namespace=namespace,
server_name=server_name,
server_port=server_port,
@ -158,7 +175,9 @@ def main(argv):
# Error case: client does not authorize server
# because of mismatched SAN name.
logger.info('Setting up mtls_error')
td.setup_for_grpc(server_xds_host, server_xds_port)
td.setup_for_grpc(server_xds_host,
server_xds_port,
health_check_port=server_maintenance_port)
# Regular TLS setup, but with client policy configured using
# intentionality incorrect server_namespace.
td.setup_server_security(server_namespace=namespace,

@ -69,8 +69,10 @@ def main(argv):
if _CMD.value == 'run':
logger.info('Run server, secure_mode=%s', _SECURE.value)
server_runner.run(test_port=xds_flags.SERVER_PORT.value,
secure_mode=_SECURE.value)
server_runner.run(
test_port=xds_flags.SERVER_PORT.value,
maintenance_port=xds_flags.SERVER_MAINTENANCE_PORT.value,
secure_mode=_SECURE.value)
elif _CMD.value == 'cleanup':
logger.info('Cleanup server')

@ -45,23 +45,37 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
class HealthCheckProtocol(enum.Enum):
TCP = enum.auto()
GRPC = enum.auto()
class BackendServiceProtocol(enum.Enum):
HTTP2 = enum.auto()
GRPC = enum.auto()
def create_health_check_tcp(self,
name,
use_serving_port=False) -> GcpResource:
def create_health_check(self,
name: str,
protocol: HealthCheckProtocol,
*,
port: Optional[int] = None) -> GcpResource:
if protocol is self.HealthCheckProtocol.TCP:
health_check_field = 'tcpHealthCheck'
elif protocol is self.HealthCheckProtocol.GRPC:
health_check_field = 'grpcHealthCheck'
else:
raise TypeError(f'Unexpected Health Check protocol: {protocol}')
health_check_settings = {}
if use_serving_port:
if port is None:
health_check_settings['portSpecification'] = 'USE_SERVING_PORT'
else:
health_check_settings['portSpecification'] = 'USE_FIXED_PORT'
health_check_settings['port'] = port
return self._insert_resource(self.api.healthChecks(), {
'name': name,
'type': 'TCP',
'tcpHealthCheck': health_check_settings,
})
return self._insert_resource(
self.api.healthChecks(), {
'name': name,
'type': protocol.name,
health_check_field: health_check_settings,
})
def delete_health_check(self, name):
self._delete_resource(self.api.healthChecks(), 'healthCheck', name)

@ -26,6 +26,7 @@ HealthCheckProtocol = _ComputeV1.HealthCheckProtocol
ZonalGcpResource = _ComputeV1.ZonalGcpResource
BackendServiceProtocol = _ComputeV1.BackendServiceProtocol
_BackendGRPC = BackendServiceProtocol.GRPC
_HealthCheckGRPC = HealthCheckProtocol.GRPC
# Network Security
_NetworkSecurityV1Alpha1 = gcp.network_security.NetworkSecurityV1Alpha1
@ -83,15 +84,18 @@ class TrafficDirectorManager:
service_host,
service_port,
*,
backend_protocol: Optional[BackendServiceProtocol] = _BackendGRPC):
self.setup_backend_for_grpc(protocol=backend_protocol)
backend_protocol: Optional[BackendServiceProtocol] = _BackendGRPC,
health_check_port: Optional[int] = None):
self.setup_backend_for_grpc(protocol=backend_protocol,
health_check_port=health_check_port)
self.setup_routing_rule_map_for_grpc(service_host, service_port)
def setup_backend_for_grpc(self,
*,
protocol: Optional[
BackendServiceProtocol] = _BackendGRPC):
self.create_health_check()
def setup_backend_for_grpc(
self,
*,
protocol: Optional[BackendServiceProtocol] = _BackendGRPC,
health_check_port: Optional[int] = None):
self.create_health_check(port=health_check_port)
self.create_backend_service(protocol)
def setup_routing_rule_map_for_grpc(self, service_host, service_port):
@ -113,17 +117,20 @@ class TrafficDirectorManager:
def _ns_name(self, name):
return f'{self.resource_prefix}-{name}'
def create_health_check(self, protocol=HealthCheckProtocol.TCP):
def create_health_check(
self,
*,
protocol: Optional[HealthCheckProtocol] = _HealthCheckGRPC,
port: Optional[int] = None):
if self.health_check:
raise ValueError(f'Health check {self.health_check.name} '
'already created, delete it first')
if protocol is None:
protocol = _HealthCheckGRPC
name = self._ns_name(self.HEALTH_CHECK_NAME)
logger.info('Creating %s Health Check "%s"', protocol.name, name)
if protocol is HealthCheckProtocol.TCP:
resource = self.compute.create_health_check_tcp(
name, use_serving_port=True)
else:
raise ValueError('Unexpected protocol')
resource = self.compute.create_health_check(name, protocol, port=port)
self.health_check = resource
def delete_health_check(self, force=False):

@ -126,6 +126,9 @@ class XdsTestServer(framework.rpc.grpc.GrpcApp):
class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
DEFAULT_TEST_PORT = 8080
DEFAULT_MAINTENANCE_PORT = 8080
DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081
def __init__(self,
k8s_namespace,
@ -176,7 +179,7 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
def run(self,
*,
test_port=8080,
test_port=DEFAULT_TEST_PORT,
maintenance_port=None,
secure_mode=False,
server_id=None,
@ -190,7 +193,11 @@ class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
# maintenance services can be reached independently from the security
# configuration under test.
if maintenance_port is None:
maintenance_port = test_port if not secure_mode else test_port + 1
if not secure_mode:
maintenance_port = self.DEFAULT_MAINTENANCE_PORT
else:
maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
if secure_mode and maintenance_port == test_port:
raise ValueError('port and maintenance_port must be different '
'when running test server in secure mode')

@ -37,7 +37,16 @@ SERVER_NAME = flags.DEFINE_string("server_name",
help="Server deployment and service name")
SERVER_PORT = flags.DEFINE_integer("server_port",
default=8080,
lower_bound=0,
upper_bound=65535,
help="Server test port")
SERVER_MAINTENANCE_PORT = flags.DEFINE_integer(
"server_maintenance_port",
lower_bound=0,
upper_bound=65535,
default=None,
help="Server port running maintenance services: health check, channelz, etc"
)
SERVER_XDS_HOST = flags.DEFINE_string("server_xds_host",
default='xds-test-server',
help="Test server xDS hostname")

@ -45,6 +45,8 @@ XdsTestClient = client_app.XdsTestClient
LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse
_ChannelState = grpc_channelz.ChannelState
_timedelta = datetime.timedelta
_DEFAULT_SECURE_MODE_MAINTENANCE_PORT = \
server_app.KubernetesServerRunner.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
class XdsKubernetesTestCase(absltest.TestCase):
@ -68,6 +70,7 @@ class XdsKubernetesTestCase(absltest.TestCase):
cls.server_image = xds_k8s_flags.SERVER_IMAGE.value
cls.server_name = xds_flags.SERVER_NAME.value
cls.server_port = xds_flags.SERVER_PORT.value
cls.server_maintenance_port = xds_flags.SERVER_MAINTENANCE_PORT.value
cls.server_xds_host = xds_flags.SERVER_NAME.value
cls.server_xds_port = xds_flags.SERVER_XDS_PORT.value
@ -110,7 +113,9 @@ class XdsKubernetesTestCase(absltest.TestCase):
force_namespace=self.force_cleanup)
def setupTrafficDirectorGrpc(self):
self.td.setup_for_grpc(self.server_xds_host, self.server_xds_port)
self.td.setup_for_grpc(self.server_xds_host,
self.server_xds_port,
health_check_port=self.server_maintenance_port)
def setupServerBackends(self, *, wait_for_healthy_status=True):
# Load Backends
@ -199,9 +204,11 @@ class RegularXdsKubernetesTestCase(XdsKubernetesTestCase):
reuse_namespace=self.server_namespace == self.client_namespace)
def startTestServer(self, replica_count=1, **kwargs) -> XdsTestServer:
test_server = self.server_runner.run(replica_count=replica_count,
test_port=self.server_port,
**kwargs)
test_server = self.server_runner.run(
replica_count=replica_count,
test_port=self.server_port,
maintenance_port=self.server_maintenance_port,
**kwargs)
test_server.set_xds_address(self.server_xds_host, self.server_xds_port)
return test_server
@ -220,6 +227,17 @@ class SecurityXdsKubernetesTestCase(XdsKubernetesTestCase):
TLS = enum.auto()
PLAINTEXT = enum.auto()
@classmethod
def setUpClass(cls):
super().setUpClass()
if cls.server_maintenance_port is None:
# In secure mode, the maintenance port is different from
# the test port to keep it insecure, and make
# Health Checks and Channelz tests available.
# When not provided, use explicit numeric port value, so
# Backend Health Checks are created on a fixed port.
cls.server_maintenance_port = _DEFAULT_SECURE_MODE_MAINTENANCE_PORT
def setUp(self):
super().setUp()
@ -259,11 +277,12 @@ class SecurityXdsKubernetesTestCase(XdsKubernetesTestCase):
debug_use_port_forwarding=self.debug_use_port_forwarding)
def startSecureTestServer(self, replica_count=1, **kwargs) -> XdsTestServer:
test_server = self.server_runner.run(replica_count=replica_count,
test_port=self.server_port,
maintenance_port=8081,
secure_mode=True,
**kwargs)
test_server = self.server_runner.run(
replica_count=replica_count,
test_port=self.server_port,
maintenance_port=self.server_maintenance_port,
secure_mode=True,
**kwargs)
test_server.set_xds_address(self.server_xds_host, self.server_xds_port)
return test_server

Loading…
Cancel
Save