xds_k8s_test_driver: Add Authz interop tests (#27639)

Tests defined in go/grpc-proxyless-authz-interop, but the purpose of
each should be pretty obvious just by reading authz_test.py.
pull/27678/head^2
Eric Anderson 3 years ago committed by GitHub
parent 1d765b28f0
commit 07a8425849
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/network_security.py
  2. 29
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  3. 41
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py
  4. 2
      tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server-secure.deployment.yaml
  5. 314
      tools/run_tests/xds_k8s_test_driver/tests/authz_test.py

@ -68,6 +68,26 @@ class ClientTlsPolicy:
update_time=response['updateTime'])
@dataclasses.dataclass(frozen=True)
class AuthorizationPolicy:
url: str
name: str
update_time: str
create_time: str
action: str
rules: list
@classmethod
def from_response(cls, name: str,
response: Dict[str, Any]) -> 'AuthorizationPolicy':
return cls(name=name,
url=response['name'],
create_time=response['createTime'],
update_time=response['updateTime'],
action=response['action'],
rules=response.get('rules', []))
class _NetworkSecurityBase(gcp.api.GcpStandardCloudApiResource,
metaclass=abc.ABCMeta):
"""Base class for NetworkSecurity APIs."""
@ -103,6 +123,7 @@ class NetworkSecurityV1Beta1(_NetworkSecurityBase):
SERVER_TLS_POLICIES = 'serverTlsPolicies'
CLIENT_TLS_POLICIES = 'clientTlsPolicies'
AUTHZ_POLICIES = 'authorizationPolicies'
@property
def api_version(self) -> str:
@ -142,6 +163,23 @@ class NetworkSecurityV1Beta1(_NetworkSecurityBase):
collection=self._api_locations.clientTlsPolicies(),
full_name=self.resource_full_name(name, self.CLIENT_TLS_POLICIES))
def create_authz_policy(self, name: str, body: dict) -> GcpResource:
return self._create_resource(
collection=self._api_locations.authorizationPolicies(),
body=body,
authorizationPolicyId=name)
def get_authz_policy(self, name: str) -> ClientTlsPolicy:
response = self._get_resource(
collection=self._api_locations.authorizationPolicies(),
full_name=self.resource_full_name(name, self.AUTHZ_POLICIES))
return ClientTlsPolicy.from_response(name, response)
def delete_authz_policy(self, name: str) -> bool:
return self._delete_resource(
collection=self._api_locations.authorizationPolicies(),
full_name=self.resource_full_name(name, self.AUTHZ_POLICIES))
class NetworkSecurityV1Alpha1(NetworkSecurityV1Beta1):
"""NetworkSecurity API v1alpha1.

@ -35,6 +35,7 @@ _HealthCheckGRPC = HealthCheckProtocol.GRPC
_NetworkSecurityV1Beta1 = gcp.network_security.NetworkSecurityV1Beta1
ServerTlsPolicy = gcp.network_security.ServerTlsPolicy
ClientTlsPolicy = gcp.network_security.ClientTlsPolicy
AuthorizationPolicy = gcp.network_security.AuthorizationPolicy
# Network Services
_NetworkServicesV1Alpha1 = gcp.network_services.NetworkServicesV1Alpha1
@ -644,6 +645,7 @@ class TrafficDirectorAppNetManager(TrafficDirectorManager):
class TrafficDirectorSecureManager(TrafficDirectorManager):
SERVER_TLS_POLICY_NAME = "server-tls-policy"
CLIENT_TLS_POLICY_NAME = "client-tls-policy"
AUTHZ_POLICY_NAME = "authz-policy"
ENDPOINT_POLICY = "endpoint-policy"
CERTIFICATE_PROVIDER_INSTANCE = "google_cloud_private_spiffe"
@ -674,6 +676,7 @@ class TrafficDirectorSecureManager(TrafficDirectorManager):
# Managed resources
self.server_tls_policy: Optional[ServerTlsPolicy] = None
self.client_tls_policy: Optional[ClientTlsPolicy] = None
self.authz_policy: Optional[AuthorizationPolicy] = None
self.endpoint_policy: Optional[EndpointPolicy] = None
def setup_server_security(self,
@ -704,6 +707,7 @@ class TrafficDirectorSecureManager(TrafficDirectorManager):
self.delete_endpoint_policy(force=force)
self.delete_server_tls_policy(force=force)
self.delete_client_tls_policy(force=force)
self.delete_authz_policy(force=force)
def create_server_tls_policy(self, *, tls, mtls):
name = self.make_resource_name(self.SERVER_TLS_POLICY_NAME)
@ -738,6 +742,29 @@ class TrafficDirectorSecureManager(TrafficDirectorManager):
self.netsec.delete_server_tls_policy(name)
self.server_tls_policy = None
def create_authz_policy(self, *, action: str, rules: list):
name = self.make_resource_name(self.AUTHZ_POLICY_NAME)
logger.info('Creating Authz Policy %s', name)
policy = {
"action": action,
"rules": rules,
}
self.netsec.create_authz_policy(name, policy)
self.authz_policy = self.netsec.get_authz_policy(name)
logger.debug('Authz Policy loaded: %r', self.authz_policy)
def delete_authz_policy(self, force=False):
if force:
name = self.make_resource_name(self.AUTHZ_POLICY_NAME)
elif self.authz_policy:
name = self.authz_policy.name
else:
return
logger.info('Deleting Authz Policy %s', name)
self.netsec.delete_authz_policy(name)
self.authz_policy = None
def create_endpoint_policy(self, *, server_namespace: str, server_name: str,
server_port: int) -> None:
name = self.make_resource_name(self.ENDPOINT_POLICY)
@ -764,6 +791,8 @@ class TrafficDirectorSecureManager(TrafficDirectorManager):
logger.warning(
'Creating Endpoint Policy %s with '
'no Server TLS policy attached', name)
if self.authz_policy:
config["authorizationPolicy"] = self.authz_policy.name
self.netsvc.create_endpoint_policy(name, config)
self.endpoint_policy = self.netsvc.get_endpoint_policy(name)

@ -22,6 +22,7 @@ from typing import List, Optional, Tuple
from absl import flags
from absl.testing import absltest
from google.protobuf import json_format
import grpc
from framework import xds_flags
from framework import xds_k8s_flags
@ -240,6 +241,46 @@ class XdsKubernetesTestCase(absltest.TestCase, metaclass=abc.ABCMeta):
0,
msg=f'Expected all RPCs to succeed: {failed} of {num_rpcs} failed')
@staticmethod
def diffAccumulatedStatsPerMethod(
before: grpc_testing.LoadBalancerAccumulatedStatsResponse,
after: grpc_testing.LoadBalancerAccumulatedStatsResponse):
"""Only diffs stats_per_method, as the other fields are deprecated."""
diff = grpc_testing.LoadBalancerAccumulatedStatsResponse()
for method, method_stats in after.stats_per_method.items():
for status, count in method_stats.result.items():
count -= before.stats_per_method[method].result[status]
if count < 0:
raise AssertionError("Diff of count shouldn't be negative")
if count > 0:
diff.stats_per_method[method].result[status] = count
return diff
def assertRpcStatusCodes(self, test_client: XdsTestClient, *,
status_code: grpc.StatusCode, duration: _timedelta,
method: str) -> None:
"""Assert all RPCs for a method are completing with a certain status."""
# Sending with pre-set QPS for a period of time
before_stats = test_client.get_load_balancer_accumulated_stats()
logging.info(
'Received LoadBalancerAccumulatedStatsResponse from test client %s: before:\n%s',
test_client.ip, before_stats)
time.sleep(duration.total_seconds())
after_stats = test_client.get_load_balancer_accumulated_stats()
logging.info(
'Received LoadBalancerAccumulatedStatsResponse from test client %s: after:\n%s',
test_client.ip, after_stats)
diff_stats = self.diffAccumulatedStatsPerMethod(before_stats,
after_stats)
stats = diff_stats.stats_per_method[method]
status = status_code.value[0]
for found_status, count in stats.result.items():
if found_status != status and count > 0:
self.fail(f"Expected only status {status} but found status "
f"{found_status} for method {method}:\n{diff_stats}")
self.assertGreater(stats.result[status_code.value[0]], 0)
def assertRpcsEventuallyGoToGivenServers(self,
test_client: XdsTestClient,
servers: List[XdsTestServer],

@ -42,6 +42,8 @@ spec:
# TODO(sergiitk): this should be conditional for if version < v1.37.x
- name: GRPC_XDS_EXPERIMENTAL_NEW_SERVER_API
value: "true"
- name: GRPC_XDS_EXPERIMENTAL_RBAC
value: "true"
volumeMounts:
- mountPath: /tmp/grpc-xds/
name: grpc-td-conf

@ -0,0 +1,314 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from typing import Optional
from absl import flags
from absl.testing import absltest
import grpc
from framework import xds_k8s_testcase
flags.adopt_module_key_flags(xds_k8s_testcase)
# Type aliases
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
_SecurityMode = xds_k8s_testcase.SecurityXdsKubernetesTestCase.SecurityMode
_SAMPLE_DURATION = datetime.timedelta(seconds=0.5)
class AuthzTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase):
RPC_TYPE_CYCLE = {
'UNARY_CALL': 'EMPTY_CALL',
'EMPTY_CALL': 'UNARY_CALL',
}
def setUp(self):
super().setUp()
self.next_rpc_type: Optional[int] = None
def authz_rules(self):
return [
{
"destinations": {
"hosts": [f"*:{self.server_xds_port}"],
"ports": [self.server_port],
"httpHeaderMatch": {
"headerName": "test",
"regexMatch": "host-wildcard",
},
},
},
{
"destinations": {
"hosts": [f"*:{self.server_xds_port}"],
"ports": [self.server_port],
"httpHeaderMatch": {
"headerName": "test",
"regexMatch": "header-regex-a+",
},
},
},
{
"destinations": [{
"hosts": [f"{self.server_xds_host}:{self.server_xds_port}"],
"ports": [self.server_port],
"httpHeaderMatch": {
"headerName": "test",
"regexMatch": "host-match1",
},
}, {
"hosts": [
f"a-not-it.com:{self.server_xds_port}",
f"{self.server_xds_host}:{self.server_xds_port}",
"z-not-it.com:1",
],
"ports": [1, self.server_port, 65535],
"httpHeaderMatch": {
"headerName": "test",
"regexMatch": "host-match2",
},
}],
},
{
"destinations": {
"hosts": [
f"not-the-host:{self.server_xds_port}",
"not-the-host",
],
"ports": [self.server_port],
"httpHeaderMatch": {
"headerName": "test",
"regexMatch": "never-match-host",
},
},
},
{
"destinations": {
"hosts": [f"*:{self.server_xds_port}"],
"ports": [1],
"httpHeaderMatch": {
"headerName": "test",
"regexMatch": "never-match-port",
},
},
},
{
"sources": {
"principals": ["*"],
},
"destinations": {
"hosts": [f"*:{self.server_xds_port}"],
"ports": [self.server_port],
"httpHeaderMatch": {
"headerName": "test",
"regexMatch": "principal-present",
},
},
},
{
"sources": [{
"principals": [
f"spiffe://{self.project}.svc.id.goog/not/the/client",
],
}, {
"principals": [
f"spiffe://{self.project}.svc.id.goog/not/the/client",
f"spiffe://{self.project}.svc.id.goog/ns/"
f"{self.client_namespace}/sa/{self.client_name}",
],
}],
"destinations": {
"hosts": [f"*:{self.server_xds_port}"],
"ports": [self.server_port],
"httpHeaderMatch": {
"headerName": "test",
"regexMatch": "match-principal",
},
},
},
{
"sources": {
"principals": [
f"spiffe://{self.project}.svc.id.goog/not/the/client",
],
},
"destinations": {
"hosts": [f"*:{self.server_xds_port}"],
"ports": [self.server_port],
"httpHeaderMatch": {
"headerName": "test",
"regexMatch": "never-match-principal",
},
},
},
]
def configure_and_assert(self, test_client: _XdsTestClient,
test_metadata_val: Optional[str],
status_code: grpc.StatusCode) -> None:
# Swap method type every sub-test to avoid mixing results
rpc_type = self.next_rpc_type
if rpc_type is None:
stats = test_client.get_load_balancer_accumulated_stats()
for t in self.RPC_TYPE_CYCLE:
if not stats.stats_per_method[t].rpcs_started:
rpc_type = t
self.assertIsNotNone(rpc_type, "All RPC types already used")
self.next_rpc_type = self.RPC_TYPE_CYCLE[rpc_type]
metadata = None
if test_metadata_val is not None:
metadata = ((rpc_type, "test", test_metadata_val),)
test_client.update_config.configure(rpc_types=[rpc_type],
metadata=metadata)
self.assertRpcStatusCodes(test_client,
status_code=status_code,
duration=_SAMPLE_DURATION,
method=rpc_type)
def test_plaintext_allow(self) -> None:
self.setupTrafficDirectorGrpc()
self.td.create_authz_policy(action='ALLOW', rules=self.authz_rules())
self.setupSecurityPolicies(server_tls=False,
server_mtls=False,
client_tls=False,
client_mtls=False)
test_server: _XdsTestServer = self.startSecureTestServer()
self.setupServerBackends()
test_client: _XdsTestClient = self.startSecureTestClient(test_server)
with self.subTest('01_host_wildcard'):
self.configure_and_assert(test_client, 'host-wildcard',
grpc.StatusCode.OK)
with self.subTest('02_no_match'):
self.configure_and_assert(test_client, 'no-such-rule',
grpc.StatusCode.PERMISSION_DENIED)
self.configure_and_assert(test_client, None,
grpc.StatusCode.PERMISSION_DENIED)
with self.subTest('03_header_regex'):
self.configure_and_assert(test_client, 'header-regex-a',
grpc.StatusCode.OK)
self.configure_and_assert(test_client, 'header-regex-aa',
grpc.StatusCode.OK)
self.configure_and_assert(test_client, 'header-regex-',
grpc.StatusCode.PERMISSION_DENIED)
self.configure_and_assert(test_client, 'header-regex-ab',
grpc.StatusCode.PERMISSION_DENIED)
self.configure_and_assert(test_client, 'aheader-regex-a',
grpc.StatusCode.PERMISSION_DENIED)
with self.subTest('04_host_match'):
self.configure_and_assert(test_client, 'host-match1',
grpc.StatusCode.OK)
self.configure_and_assert(test_client, 'host-match2',
grpc.StatusCode.OK)
with self.subTest('05_never_match_host'):
self.configure_and_assert(test_client, 'never-match-host',
grpc.StatusCode.PERMISSION_DENIED)
with self.subTest('06_never_match_port'):
self.configure_and_assert(test_client, 'never-match-port',
grpc.StatusCode.PERMISSION_DENIED)
with self.subTest('07_principal_present'):
self.configure_and_assert(test_client, 'principal-present',
grpc.StatusCode.PERMISSION_DENIED)
def test_tls_allow(self) -> None:
self.setupTrafficDirectorGrpc()
self.td.create_authz_policy(action='ALLOW', rules=self.authz_rules())
self.setupSecurityPolicies(server_tls=True,
server_mtls=False,
client_tls=True,
client_mtls=False)
test_server: _XdsTestServer = self.startSecureTestServer()
self.setupServerBackends()
test_client: _XdsTestClient = self.startSecureTestClient(test_server)
with self.subTest('01_host_wildcard'):
self.configure_and_assert(test_client, 'host-wildcard',
grpc.StatusCode.OK)
with self.subTest('02_no_match'):
self.configure_and_assert(test_client, None,
grpc.StatusCode.PERMISSION_DENIED)
with self.subTest('03_principal_present'):
self.configure_and_assert(test_client, 'principal-present',
grpc.StatusCode.PERMISSION_DENIED)
def test_mtls_allow(self) -> None:
self.setupTrafficDirectorGrpc()
self.td.create_authz_policy(action='ALLOW', rules=self.authz_rules())
self.setupSecurityPolicies(server_tls=True,
server_mtls=True,
client_tls=True,
client_mtls=True)
test_server: _XdsTestServer = self.startSecureTestServer()
self.setupServerBackends()
test_client: _XdsTestClient = self.startSecureTestClient(test_server)
with self.subTest('01_host_wildcard'):
self.configure_and_assert(test_client, 'host-wildcard',
grpc.StatusCode.OK)
with self.subTest('02_no_match'):
self.configure_and_assert(test_client, None,
grpc.StatusCode.PERMISSION_DENIED)
# b/202058316
# with self.subTest('03_principal_present'):
# self.configure_and_assert(
# test_client, 'principal-present', grpc.StatusCode.OK)
with self.subTest('04_match_principal'):
self.configure_and_assert(test_client, 'match-principal',
grpc.StatusCode.OK)
with self.subTest('05_never_match_principal'):
self.configure_and_assert(test_client, 'never-match-principal',
grpc.StatusCode.PERMISSION_DENIED)
def test_plaintext_deny(self) -> None:
self.setupTrafficDirectorGrpc()
self.td.create_authz_policy(action='DENY', rules=self.authz_rules())
self.setupSecurityPolicies(server_tls=False,
server_mtls=False,
client_tls=False,
client_mtls=False)
test_server: _XdsTestServer = self.startSecureTestServer()
self.setupServerBackends()
test_client: _XdsTestClient = self.startSecureTestClient(test_server)
with self.subTest('01_host_wildcard'):
self.configure_and_assert(test_client, 'host-wildcard',
grpc.StatusCode.PERMISSION_DENIED)
with self.subTest('02_no_match'):
self.configure_and_assert(test_client, None, grpc.StatusCode.OK)
if __name__ == '__main__':
absltest.main()
Loading…
Cancel
Save